package org.apache.pulsar.client.impl;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.EventExecutor;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionCoordinatorClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.internal.DefaultImplementation;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker-impl"})
/* loaded from: input_file:org/apache/pulsar/client/impl/TransactionEndToEndTest.class */
public class TransactionEndToEndTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionEndToEndTest.class);
    protected static final int TOPIC_PARTITION = 3;
    protected static final String TOPIC_OUTPUT = "tnx/ns1/output";
    protected static final String TOPIC_MESSAGE_ACK_TEST = "tnx/ns1/message-ack-test";
    protected static final int NUM_PARTITIONS = 16;
    private static final int waitTimeForCanReceiveMsgInSec = 5;
    private static final int waitTimeForCannotReceiveMsgInSec = 5;

    @BeforeClass
    protected void setup() throws Exception {
        this.conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
        setUpBase(1, NUM_PARTITIONS, TOPIC_OUTPUT, 3);
        this.admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
    }

    protected void resetTopicOutput() throws Exception {
        this.admin.topics().deletePartitionedTopic(TOPIC_OUTPUT, true);
        this.admin.topics().createPartitionedTopic(TOPIC_OUTPUT, 3);
        this.admin.topics().deletePartitionedTopic(TOPIC_MESSAGE_ACK_TEST, true);
        this.admin.topics().createPartitionedTopic(TOPIC_MESSAGE_ACK_TEST, 1);
    }

    @AfterClass(alwaysRun = true)
    protected void cleanup() {
        super.internalCleanup();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "enableBatch")
    public Object[][] enableBatch() {
        return new Object[]{new Object[]{Boolean.TRUE}, new Object[]{Boolean.FALSE}};
    }

    @Test
    private void testIndividualAckAbortFilterAckSetInPendingAckState() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic("tnx/ns1/testIndividualAckAbortFilterAckSetInPendingAckState").enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.HOURS).batchingMaxMessages(9).create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"tnx/ns1/testIndividualAckAbortFilterAckSetInPendingAckState"}).isAckReceiptEnabled(true).subscriptionName("test").subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).subscribe();
        for (int i = 0; i < 9; i++) {
            try {
                create.sendAsync(Integer.valueOf(i));
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        }
        Transaction txn = getTxn();
        Transaction txn2 = getTxn();
        for (int i2 = 0; i2 < 3; i2++) {
            subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), txn).get();
        }
        for (int i3 = 0; i3 < 3; i3++) {
            subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), txn2).get();
        }
        for (int i4 = 0; i4 < 3; i4++) {
            subscribe.acknowledgeAsync(subscribe.receive()).get();
        }
        txn2.abort().get();
        for (int i5 = 0; i5 < 3; i5++) {
            Assert.assertEquals(((Integer) subscribe.receive().getValue()).intValue(), i5 + 3);
        }
        Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
        if (Collections.singletonList(subscribe).get(0) != null) {
            subscribe.close();
        }
    }

    @Test(dataProvider = "enableBatch")
    private void testFilterMsgsInPendingAckStateWhenConsumerDisconnect(boolean z) throws Exception {
        String str = "tnx/ns1/testFilterMsgsInPendingAckStateWhenConsumerDisconnect-" + z;
        Producer producer = null;
        try {
            producer = z ? this.pulsarClient.newProducer(Schema.INT32).topic(str).enableBatching(true).batchingMaxPublishDelay(1L, TimeUnit.HOURS).batchingMaxMessages(10).create() : this.pulsarClient.newProducer(Schema.INT32).topic(str).enableBatching(false).create();
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).isAckReceiptEnabled(true).subscriptionName("test").subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).subscribe();
            for (int i = 0; i < 10; i++) {
                try {
                    producer.sendAsync(Integer.valueOf(i));
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            Transaction txn = getTxn();
            Transaction txn2 = getTxn();
            for (int i2 = 0; i2 < 5; i2++) {
                subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), txn).get();
            }
            for (int i3 = 5; i3 < 10; i3++) {
                subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), txn2).get();
            }
            txn2.commit().get();
            subscribe.close();
            subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{str}).isAckReceiptEnabled(true).subscriptionName("test").subscriptionType(SubscriptionType.Shared).enableBatchIndexAcknowledgment(true).subscribe();
            Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
            txn.abort().get();
            int i4 = 0;
            while (true) {
                Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                if (receive == null) {
                    break;
                }
                Assert.assertEquals(((Integer) receive.getValue()).intValue(), i4);
                i4++;
            }
            Assert.assertEquals(i4, 5);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    private void testMsgsInPendingAckStateWouldNotGetTheConsumerStuck() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.INT32).topic("tnx/ns1/testMsgsInPendingAckStateWouldNotGetTheConsumerStuck").create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"tnx/ns1/testMsgsInPendingAckStateWouldNotGetTheConsumerStuck"}).subscriptionName("test").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
            try {
                int i = 2 + 2 + 2;
                for (int i2 = 0; i2 < i; i2++) {
                    create.send(Integer.valueOf(i2));
                }
                Transaction txn = getTxn();
                Transaction txn2 = getTxn();
                for (int i3 = 0; i3 < 2; i3++) {
                    subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), txn).get();
                }
                for (int i4 = 0; i4 < 2; i4++) {
                    subscribe.acknowledgeAsync(subscribe.receive().getMessageId(), txn2).get();
                }
                txn2.commit().get();
                subscribe.close();
                Consumer subscribe2 = this.pulsarClient.newConsumer(Schema.INT32).topic(new String[]{"tnx/ns1/testMsgsInPendingAckStateWouldNotGetTheConsumerStuck"}).receiverQueueSize(2).subscriptionName("test").subscriptionType(SubscriptionType.Shared).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                for (int i5 = 0; i5 < 2; i5++) {
                    try {
                        Assert.assertEquals((Integer) subscribe2.receive(3, TimeUnit.SECONDS).getValue(), 2 + 2 + i5);
                    } catch (Throwable th) {
                        if (Collections.singletonList(subscribe2).get(0) != null) {
                            subscribe2.close();
                        }
                        throw th;
                    }
                }
                if (Collections.singletonList(subscribe2).get(0) != null) {
                    subscribe2.close();
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(dataProvider = "enableBatch")
    private void produceCommitTest(boolean z) throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{TOPIC_OUTPUT}).subscriptionName("test").enableBatchIndexAcknowledgment(true).subscribe();
        try {
            ConditionFactory await = Awaitility.await();
            Objects.requireNonNull(subscribe);
            await.until(subscribe::isConnected);
            Producer create = this.pulsarClient.newProducer().topic(TOPIC_OUTPUT).enableBatching(z).sendTimeout(0, TimeUnit.SECONDS).create();
            try {
                Transaction txn = getTxn();
                Transaction txn2 = getTxn();
                int i = 0;
                for (int i2 = 0; i2 < 1000; i2++) {
                    if (i2 % 5 == 0) {
                        create.newMessage(txn).value(("Hello Txn - " + i2).getBytes(StandardCharsets.UTF_8)).sendAsync();
                    } else {
                        create.newMessage(txn2).value(("Hello Txn - " + i2).getBytes(StandardCharsets.UTF_8)).sendAsync();
                    }
                    i++;
                }
                Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                txn.commit().get();
                txn2.commit().get();
                int i3 = 0;
                for (int i4 = 0; i4 < i; i4++) {
                    Assert.assertNotNull(subscribe.receive(5, TimeUnit.SECONDS));
                    i3++;
                }
                Assert.assertEquals(i, i3);
                Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                create.close();
                subscribe.close();
                resetTopicOutput();
                log.info("message commit test enableBatch {}", Boolean.valueOf(z));
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void produceAbortTest() throws Exception {
        Transaction txn = getTxn();
        String str = "test";
        Producer create = this.pulsarClient.newProducer().topic(TOPIC_OUTPUT).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        for (int i = 0; i < 10; i++) {
            create.newMessage(txn).value(("Hello Txn - " + i).getBytes(StandardCharsets.UTF_8)).send();
        }
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{TOPIC_OUTPUT}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("test").enableBatchIndexAcknowledgment(true).subscribe();
        ConditionFactory await = Awaitility.await();
        Objects.requireNonNull(subscribe);
        await.until(subscribe::isConnected);
        Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
        txn.abort().get();
        Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
        Awaitility.await().until(() -> {
            boolean z = true;
            for (int i2 = 0; i2 < 3; i2++) {
                String topicName = TopicName.get(TOPIC_OUTPUT).getPartition(i2).toString();
                boolean z2 = false;
                for (int i3 = 0; i3 < getPulsarServiceList().size(); i3++) {
                    Field declaredField = BrokerService.class.getDeclaredField("topics");
                    declaredField.setAccessible(true);
                    CompletableFuture completableFuture = (CompletableFuture) ((ConcurrentOpenHashMap) declaredField.get(getPulsarServiceList().get(i3).getBrokerService())).get(topicName);
                    if (completableFuture != null) {
                        Optional optional = (Optional) completableFuture.get();
                        if (optional.isPresent()) {
                            PersistentSubscription subscription = ((Topic) optional.get()).getSubscription(str);
                            PositionImpl markDeletedPosition = subscription.getCursor().getMarkDeletedPosition();
                            Position lastConfirmedEntry = subscription.getCursor().getManagedLedger().getLastConfirmedEntry();
                            z2 = true;
                            if (!markDeletedPosition.equals(lastConfirmedEntry) && !subscription.getCursor().getManagedLedger().getNextValidPosition(markDeletedPosition).equals(lastConfirmedEntry)) {
                                log.error("Mark delete position is not commit marker position!");
                                z = false;
                            }
                        }
                    }
                }
                Assert.assertTrue(z2);
            }
            return Boolean.valueOf(z);
        });
        create.close();
        subscribe.close();
        resetTopicOutput();
        log.info("finished test partitionAbortTest");
    }

    @Test(dataProvider = "enableBatch")
    private void testAckWithTransactionReduceUnAckMessageCount(boolean z) throws Exception {
        String str = "tnx/ns1/testAckWithTransactionReduceUnAckMessageCount-" + z;
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{str}).subscriptionName("testAckWithTransactionReduceUnAckMessageCount").subscriptionType(SubscriptionType.Shared).isAckReceiptEnabled(true).subscribe();
        try {
            ConditionFactory await = Awaitility.await();
            Objects.requireNonNull(subscribe);
            await.until(subscribe::isConnected);
            Producer create = this.pulsarClient.newProducer().topic(str).enableBatching(z).batchingMaxMessages(10).create();
            CountDownLatch countDownLatch = new CountDownLatch(50);
            for (int i = 0; i < 50; i++) {
                CompletableFuture sendAsync = create.sendAsync((i).getBytes());
                Objects.requireNonNull(countDownLatch);
                sendAsync.thenRun(countDownLatch::countDown);
            }
            countDownLatch.await();
            Transaction txn = getTxn();
            for (int i2 = 0; i2 < 25; i2++) {
                subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), txn).get();
            }
            txn.commit().get();
            boolean z2 = false;
            String topicName = TopicName.get(str).toString();
            for (int i3 = 0; i3 < getPulsarServiceList().size(); i3++) {
                CompletableFuture topic = getPulsarServiceList().get(i3).getBrokerService().getTopic(topicName, false);
                if (topic != null) {
                    Optional optional = (Optional) topic.get();
                    if (optional.isPresent()) {
                        Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) ((Topic) optional.get()).getSubscription("testAckWithTransactionReduceUnAckMessageCount").getConsumers().get(0)).getUnackedMessages(), 25);
                        z2 = true;
                    }
                }
            }
            Assert.assertTrue(z2);
            create.close();
            subscribe.close();
            this.admin.topics().delete(str, true);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th;
        }
    }

    @Test
    public void txnIndividualAckTestNoBatchAndSharedSub() throws Exception {
        txnAckTest(false, 1, SubscriptionType.Shared);
    }

    @Test
    public void txnIndividualAckTestBatchAndSharedSub() throws Exception {
        txnAckTest(true, 200, SubscriptionType.Shared);
    }

    @Test
    public void txnIndividualAckTestNoBatchAndFailoverSub() throws Exception {
        txnAckTest(false, 1, SubscriptionType.Failover);
    }

    @Test
    public void txnIndividualAckTestBatchAndFailoverSub() throws Exception {
        txnAckTest(true, 200, SubscriptionType.Failover);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void txnAckTest(boolean z, int i, SubscriptionType subscriptionType) throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/normal-topic"}).subscriptionName("test").enableBatchIndexAcknowledgment(true).subscriptionType(subscriptionType).subscribe();
        try {
            ConditionFactory await = Awaitility.await();
            Objects.requireNonNull(subscribe);
            await.until(subscribe::isConnected);
            Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/normal-topic").enableBatching(z).batchingMaxMessages(i).create();
            for (int i2 = 0; i2 < 2; i2++) {
                try {
                    Transaction txn = getTxn();
                    for (int i3 = 0; i3 < 1000; i3++) {
                        create.newMessage().value("hello".getBytes()).sendAsync();
                    }
                    for (int i4 = 0; i4 < 1000; i4++) {
                        Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                        Assert.assertNotNull(receive);
                        log.info("receive msgId: {}, count : {}", receive.getMessageId(), Integer.valueOf(i4));
                        subscribe.acknowledgeAsync(receive.getMessageId(), txn).get();
                    }
                    Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                    txn.abort().get();
                    Transaction txn2 = getTxn();
                    for (int i5 = 0; i5 < 1000; i5++) {
                        Message receive2 = subscribe.receive(5, TimeUnit.SECONDS);
                        Assert.assertNotNull(receive2);
                        subscribe.acknowledgeAsync(receive2.getMessageId(), txn2).get();
                        log.info("receive msgId: {}, count: {}", receive2.getMessageId(), Integer.valueOf(i5));
                    }
                    txn2.commit().get();
                    Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                    Field declaredField = TransactionImpl.class.getDeclaredField("state");
                    declaredField.setAccessible(true);
                    declaredField.set(txn2, Transaction.State.OPEN);
                    try {
                        txn2.commit().get();
                        Assert.fail("recommit one transaction should be failed.");
                    } catch (Exception e) {
                        log.info("expected exception for recommit one transaction.");
                        Assert.assertNotNull(e);
                        Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException);
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            create.close();
            subscribe.close();
            this.admin.topics().delete("tnx/ns1/normal-topic", true);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testAfterDeleteTopicOtherTopicCanRecover() throws Exception {
        this.admin.topics().createNonPartitionedTopic("persistent://tnx/ns1/topic-one");
        this.admin.topics().createSubscription("persistent://tnx/ns1/topic-one", "test", MessageId.earliest);
        this.admin.topics().delete("persistent://tnx/ns1/topic-one");
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("persistent://tnx/ns1/topic-two").create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"persistent://tnx/ns1/topic-two"}).subscriptionName("test").subscribe();
        create.send("test");
        Assert.assertEquals((String) subscribe.receive(5, TimeUnit.SECONDS).getValue(), "test");
        create.close();
        subscribe.close();
        this.admin.topics().delete("persistent://tnx/ns1/topic-two", true);
    }

    @Test
    public void txnMessageAckTest() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{TOPIC_MESSAGE_ACK_TEST}).subscriptionName("test").enableBatchIndexAcknowledgment(true).acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
        try {
            ConditionFactory await = Awaitility.await();
            Objects.requireNonNull(subscribe);
            await.until(subscribe::isConnected);
            Producer create = this.pulsarClient.newProducer().topic(TOPIC_MESSAGE_ACK_TEST).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
            try {
                Transaction txn = getTxn();
                for (int i = 0; i < 10; i++) {
                    create.newMessage(txn).value(("Hello Txn - " + i).getBytes(StandardCharsets.UTF_8)).sendAsync();
                }
                log.info("produce transaction messages finished");
                Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                log.info("transaction messages can't be received before transaction committed");
                txn.commit().get();
                int i2 = 0;
                int i3 = 0;
                for (int i4 = 0; i4 < 10; i4++) {
                    Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive);
                    i3++;
                    if (i4 % 2 == 0) {
                        subscribe.acknowledge(receive);
                        i2++;
                    }
                }
                Assert.assertEquals(10, i3);
                Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                PersistentTopicInternalStats internalStats = this.admin.topics().getInternalStats(TopicName.get(TOPIC_MESSAGE_ACK_TEST).getPartition(0).toString(), false);
                Assert.assertNotEquals(((ManagedLedgerInternalStats.CursorStats) internalStats.cursors.get("test")).markDeletePosition, internalStats.lastConfirmedEntry);
                subscribe.redeliverUnacknowledgedMessages();
                int i5 = 0;
                for (int i6 = 0; i6 < 10 - i2; i6++) {
                    Message receive2 = subscribe.receive(5, TimeUnit.SECONDS);
                    Assert.assertNotNull(receive2);
                    subscribe.acknowledge(receive2);
                    i5++;
                }
                Assert.assertEquals(10 - i2, i5);
                Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                String topicName = TopicName.get(TOPIC_MESSAGE_ACK_TEST).getPartition(0).toString();
                boolean z = false;
                for (int i7 = 0; i7 < getPulsarServiceList().size(); i7++) {
                    Field declaredField = BrokerService.class.getDeclaredField("topics");
                    declaredField.setAccessible(true);
                    CompletableFuture completableFuture = (CompletableFuture) ((ConcurrentOpenHashMap) declaredField.get(getPulsarServiceList().get(i7).getBrokerService())).get(topicName);
                    if (completableFuture != null) {
                        Optional optional = (Optional) completableFuture.get();
                        if (optional.isPresent()) {
                            PersistentSubscription subscription = ((Topic) optional.get()).getSubscription("test");
                            PositionImpl markDeletedPosition = subscription.getCursor().getMarkDeletedPosition();
                            Position lastConfirmedEntry = subscription.getCursor().getManagedLedger().getLastConfirmedEntry();
                            z = true;
                            if (!markDeletedPosition.equals(lastConfirmedEntry) && !subscription.getCursor().getManagedLedger().getNextValidPosition(markDeletedPosition).equals(lastConfirmedEntry)) {
                                log.error("Mark delete position is not commit marker position!");
                                Assert.fail();
                            }
                        }
                    }
                }
                Assert.assertTrue(z);
                create.close();
                subscribe.close();
                resetTopicOutput();
                log.info("receive transaction messages count: {}", Integer.valueOf(i5));
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void txnAckTestBatchAndCumulativeSub() throws Exception {
        txnCumulativeAckTest(true, 200, SubscriptionType.Failover);
    }

    @Test
    public void txnAckTestNoBatchAndCumulativeSub() throws Exception {
        txnCumulativeAckTest(false, 1, SubscriptionType.Failover);
    }

    private void txnCumulativeAckTest(boolean z, int i, SubscriptionType subscriptionType) throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/normal-topic"}).subscriptionName("test").enableBatchIndexAcknowledgment(true).subscriptionType(subscriptionType).ackTimeout(1L, TimeUnit.MINUTES).subscribe();
        try {
            ConditionFactory await = Awaitility.await();
            Objects.requireNonNull(subscribe);
            await.until(subscribe::isConnected);
            Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/normal-topic").enableBatching(z).batchingMaxMessages(i).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).create();
            for (int i2 = 0; i2 < 2; i2++) {
                try {
                    Transaction txn = getTxn();
                    for (int i3 = 0; i3 < 1000; i3++) {
                        create.newMessage().value("hello".getBytes()).sendAsync();
                    }
                    Message message = null;
                    Thread.sleep(1000L);
                    for (int i4 = 0; i4 < 1000; i4++) {
                        message = subscribe.receive(5, TimeUnit.SECONDS);
                        Assert.assertNotNull(message);
                        if (i4 % 3 == 0) {
                            subscribe.acknowledgeCumulativeAsync(message.getMessageId(), txn).get();
                        }
                        log.info("receive msgId abort: {}, retryCount : {}, count : {}", new Object[]{message.getMessageId(), Integer.valueOf(i2), Integer.valueOf(i4)});
                    }
                    try {
                        subscribe.acknowledgeCumulativeAsync(message.getMessageId(), txn).get();
                        Assert.fail("not ack conflict ");
                    } catch (Exception e) {
                        Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
                    }
                    try {
                        subscribe.acknowledgeCumulativeAsync(DefaultImplementation.getDefaultImplementation().newMessageId(message.getMessageId().getLedgerId(), message.getMessageId().getEntryId() - 1, -1), txn).get();
                        Assert.fail("not ack conflict ");
                    } catch (Exception e2) {
                        Assert.assertTrue(e2.getCause() instanceof PulsarClientException.TransactionConflictException);
                    }
                    Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                    txn.abort().get();
                    subscribe.redeliverUnacknowledgedMessages();
                    Transaction txn2 = getTxn();
                    for (int i5 = 0; i5 < 1000; i5++) {
                        Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                        Assert.assertNotNull(receive);
                        if (i5 % 3 == 0) {
                            subscribe.acknowledgeCumulativeAsync(receive.getMessageId(), txn2).get();
                        }
                        log.info("receive msgId abort: {}, retryCount : {}, count : {}", new Object[]{receive.getMessageId(), Integer.valueOf(i2), Integer.valueOf(i5)});
                    }
                    txn2.commit().get();
                    Field declaredField = TransactionImpl.class.getDeclaredField("state");
                    declaredField.setAccessible(true);
                    declaredField.set(txn2, Transaction.State.OPEN);
                    try {
                        txn2.commit().get();
                        Assert.fail("recommit one transaction should be failed.");
                    } catch (Exception e3) {
                        log.info("expected exception for recommit one transaction.");
                        Assert.assertNotNull(e3);
                        Assert.assertTrue(e3.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException);
                    }
                    Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            create.close();
            subscribe.close();
            this.admin.topics().delete("tnx/ns1/normal-topic", true);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    public Transaction getTxn() throws Exception {
        return (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.MINUTES).build().get();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void txnMetadataHandlerRecoverTest() throws Exception {
        Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/tc-metadata-handler-recover").sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < 20; i++) {
                TransactionImpl transactionImpl = (TransactionImpl) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
                for (int i2 = 0; i2 < 10; i2++) {
                    create.newMessage(transactionImpl).value("Hello".getBytes()).sendAsync().get();
                }
                arrayList.add(new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits()));
            }
            PulsarClientImpl build = PulsarClient.builder().serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl()).statsInterval(0L, TimeUnit.SECONDS).enableTransaction(true).build();
            try {
                TransactionCoordinatorClientImpl tcClient = build.getTcClient();
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    tcClient.commit((TxnID) it.next());
                }
                Consumer subscribe = build.newConsumer().topic(new String[]{"tnx/ns1/tc-metadata-handler-recover"}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
                try {
                    ConditionFactory await = Awaitility.await();
                    Objects.requireNonNull(subscribe);
                    await.until(subscribe::isConnected);
                    for (int i3 = 0; i3 < 20 * 10; i3++) {
                        Assert.assertNotNull(subscribe.receive(5, TimeUnit.SECONDS));
                    }
                    create.close();
                    subscribe.close();
                    build.close();
                    this.admin.topics().delete("tnx/ns1/tc-metadata-handler-recover", true);
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    if (Collections.singletonList(build).get(0) != null) {
                        build.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(build).get(0) != null) {
                    build.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void produceTxnMessageOrderTest() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/txn-produce-order"}).subscriptionName("test").subscribe();
        try {
            ConditionFactory await = Awaitility.await();
            Objects.requireNonNull(subscribe);
            await.until(subscribe::isConnected);
            Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/txn-produce-order").sendTimeout(0, TimeUnit.SECONDS).producerName("txn-publish-order").create();
            for (int i = 0; i < 10; i++) {
                try {
                    Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                    for (int i2 = 0; i2 < 1000; i2++) {
                        create.newMessage(transaction).value((i2).getBytes()).sendAsync();
                    }
                    transaction.commit().get();
                    for (int i3 = 0; i3 < 1000; i3++) {
                        Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                        Assert.assertNotNull(receive);
                        Assert.assertEquals(Integer.valueOf(new String(receive.getData())), Integer.valueOf(i3));
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            create.close();
            subscribe.close();
            this.admin.topics().delete("tnx/ns1/txn-produce-order", true);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void produceAndConsumeCloseStateTxnTest() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/txn-close-state"}).subscriptionName("test").subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/txn-close-state").sendTimeout(0, TimeUnit.SECONDS).producerName("txn-close-state").create();
            try {
                Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(2L, TimeUnit.SECONDS).build().get();
                create.newMessage(transaction).value("Hello Pulsar!".getBytes()).sendAsync().get();
                transaction.commit().get();
                try {
                    create.newMessage(transaction).value("Hello Pulsar!".getBytes()).sendAsync().get();
                    Assert.fail();
                } catch (Exception e) {
                    Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
                }
                try {
                    transaction.commit().get();
                    Assert.fail();
                } catch (Exception e2) {
                    Assert.assertTrue(e2.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
                }
                Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                subscribe.acknowledgeAsync(receive.getMessageId(), transaction2).get();
                transaction2.commit().get();
                try {
                    subscribe.acknowledgeAsync(receive.getMessageId(), transaction2).get();
                    Assert.fail();
                } catch (Exception e3) {
                    Assert.assertTrue(e3.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
                }
                try {
                    transaction2.commit().get();
                    Assert.fail();
                } catch (Exception e4) {
                    Assert.assertTrue(e4.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
                }
                Transaction transaction3 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.SECONDS).build().get();
                AtomicReference atomicReference = new AtomicReference();
                getPulsarServiceList().forEach(pulsarService -> {
                    if (pulsarService.getTransactionMetadataStoreService().getStores().containsKey(TransactionCoordinatorID.get(((TransactionImpl) transaction3).getTxnIdMostBits()))) {
                        atomicReference.set((TransactionMetadataStore) pulsarService.getTransactionMetadataStoreService().getStores().get(TransactionCoordinatorID.get(((TransactionImpl) transaction3).getTxnIdMostBits())));
                    }
                });
                Awaitility.await().until(() -> {
                    try {
                        ((TransactionMetadataStore) atomicReference.get()).getTxnMeta(new TxnID(((TransactionImpl) transaction3).getTxnIdMostBits(), ((TransactionImpl) transaction3).getTxnIdLeastBits())).get();
                        return false;
                    } catch (Exception e5) {
                        return true;
                    }
                });
                Constructor declaredConstructor = TransactionImpl.class.getDeclaredConstructor(PulsarClientImpl.class, Long.TYPE, Long.TYPE, Long.TYPE);
                declaredConstructor.setAccessible(true);
                TransactionImpl transactionImpl = (TransactionImpl) declaredConstructor.newInstance(this.pulsarClient, 5, Long.valueOf(transaction3.getTxnID().getLeastSigBits()), Long.valueOf(transaction3.getTxnID().getMostSigBits()));
                try {
                    transactionImpl.commit().get();
                    Assert.fail();
                } catch (Exception e5) {
                    Assert.assertTrue(e5.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException);
                }
                Field declaredField = TransactionImpl.class.getDeclaredField("state");
                declaredField.setAccessible(true);
                Assert.assertEquals((Transaction.State) declaredField.get(transactionImpl), Transaction.State.ERROR);
                transaction3.abort();
                create.close();
                subscribe.close();
                this.admin.topics().delete("tnx/ns1/txn-close-state", true);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testTxnTimeoutAtTransactionMetadataStore() throws Exception {
        Collection values = getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores().values();
        long sum = values.stream().mapToLong(transactionMetadataStore -> {
            return transactionMetadataStore.getMetadataStoreStats().timeoutCount;
        }).sum();
        TxnID txnID = (TxnID) this.pulsarServiceList.get(0).getTransactionMetadataStoreService().newTransaction(new TransactionCoordinatorID(0L), 1L, (String) null).get();
        Awaitility.await().until(() -> {
            try {
                getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
                return false;
            } catch (Exception e) {
                return true;
            }
        });
        Assert.assertEquals(values.stream().mapToLong(transactionMetadataStore2 -> {
            return transactionMetadataStore2.getMetadataStoreStats().timeoutCount;
        }).sum(), sum + 1);
    }

    @Test
    public void transactionTimeoutTest() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"tnx/ns1/txn-timeout"}).subscriptionName("test").subscribe();
        try {
            Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("tnx/ns1/txn-timeout").sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).producerName("txn-timeout").create();
            try {
                create.send("Hello Pulsar!");
                Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(7L, TimeUnit.SECONDS).build().get();
                Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                subscribe.acknowledgeAsync(receive.getMessageId(), transaction).get();
                Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                Message receive2 = subscribe.receive(5, TimeUnit.SECONDS);
                Assert.assertEquals((String) receive2.getValue(), (String) receive.getValue());
                Assert.assertEquals(receive2.getMessageId(), receive.getMessageId());
                transaction.abort();
                create.close();
                subscribe.close();
                this.admin.topics().delete("tnx/ns1/txn-timeout", true);
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(create).get(0) != null) {
                    create.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @DataProvider(name = "ackType")
    public static Object[] ackType() {
        return new Object[]{CommandAck.AckType.Cumulative, CommandAck.AckType.Individual};
    }

    @Test(dataProvider = "ackType")
    public void txnTransactionRedeliverNullDispatcher(CommandAck.AckType ackType) throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/txnTransactionRedeliverNullDispatcher"}).subscriptionName("test").enableBatchIndexAcknowledgment(true).acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
        try {
            ConditionFactory await = Awaitility.await();
            Objects.requireNonNull(subscribe);
            await.until(subscribe::isConnected);
            Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/txnTransactionRedeliverNullDispatcher").sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
            for (int i = 0; i < 10; i++) {
                try {
                    create.send(("Hello Txn - " + i).getBytes(StandardCharsets.UTF_8));
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            Transaction txn = getTxn();
            if (ackType == CommandAck.AckType.Individual) {
                subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), txn);
            } else {
                subscribe.acknowledgeCumulativeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), txn);
            }
            String topicName = TopicName.get("tnx/ns1/txnTransactionRedeliverNullDispatcher").toString();
            boolean z = false;
            for (int i2 = 0; i2 < getPulsarServiceList().size(); i2++) {
                Field declaredField = BrokerService.class.getDeclaredField("topics");
                declaredField.setAccessible(true);
                CompletableFuture completableFuture = (CompletableFuture) ((ConcurrentOpenHashMap) declaredField.get(getPulsarServiceList().get(i2).getBrokerService())).get(topicName);
                if (completableFuture != null) {
                    Optional optional = (Optional) completableFuture.get();
                    if (optional.isPresent()) {
                        PersistentSubscription subscription = ((Topic) optional.get()).getSubscription("test");
                        Field declaredField2 = subscription.getClass().getDeclaredField("dispatcher");
                        declaredField2.setAccessible(true);
                        declaredField2.set(subscription, null);
                        z = true;
                    }
                }
            }
            txn.abort().get();
            Assert.assertTrue(z);
            create.close();
            subscribe.close();
            this.admin.topics().delete(topicName, true);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void oneTransactionOneTopicWithMultiSubTest() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/oneTransactionOneTopicWithMultiSubTest"}).subscriptionName("test1").acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
        try {
            ConditionFactory await = Awaitility.await();
            Objects.requireNonNull(subscribe);
            await.until(subscribe::isConnected);
            subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/oneTransactionOneTopicWithMultiSubTest"}).subscriptionName("test2").acknowledgmentGroupTime(0L, TimeUnit.MILLISECONDS).subscribe();
            try {
                ConditionFactory await2 = Awaitility.await();
                Objects.requireNonNull(subscribe);
                await2.until(subscribe::isConnected);
                Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/oneTransactionOneTopicWithMultiSubTest").sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
                try {
                    MessageId send = create.send("Hello Pulsar".getBytes(StandardCharsets.UTF_8));
                    TransactionImpl txn = getTxn();
                    subscribe.acknowledgeAsync(send, txn).get();
                    subscribe.acknowledgeAsync(send, txn).get();
                    boolean z = false;
                    for (int i = 0; i < getPulsarServiceList().size(); i++) {
                        TransactionMetadataStoreService transactionMetadataStoreService = getPulsarServiceList().get(i).getTransactionMetadataStoreService();
                        if (transactionMetadataStoreService.getStores().containsKey(TransactionCoordinatorID.get(txn.getTxnIdMostBits()))) {
                            List ackedPartitions = ((TxnMeta) transactionMetadataStoreService.getTxnMeta(new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits())).get()).ackedPartitions();
                            z = true;
                            Assert.assertEquals(ackedPartitions.size(), 2);
                            if (((TransactionSubscription) ackedPartitions.get(0)).getSubscription().equals("test1")) {
                                Assert.assertEquals(((TransactionSubscription) ackedPartitions.get(1)).getSubscription(), "test2");
                            } else {
                                Assert.assertEquals(((TransactionSubscription) ackedPartitions.get(0)).getSubscription(), "test2");
                                Assert.assertEquals(((TransactionSubscription) ackedPartitions.get(1)).getSubscription(), "test1");
                            }
                        }
                    }
                    Assert.assertTrue(z);
                    txn.abort().get();
                    create.close();
                    subscribe.close();
                    subscribe.close();
                    this.admin.topics().delete("tnx/ns1/oneTransactionOneTopicWithMultiSubTest", true);
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            } finally {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th2;
        }
    }

    @Test
    public void testTxnTimeOutInClient() throws Exception {
        Producer create = this.pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer").topic("tnx/ns1/testTxnTimeOutInClient").sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).consumerName("testTxnTimeOut_consumer").topic(new String[]{"tnx/ns1/testTxnTimeOutInClient"}).subscriptionName("testTxnTimeOut_sub").subscribe();
            try {
                Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.SECONDS).build().get();
                create.newMessage().send();
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertEquals(((TransactionImpl) transaction).getState(), Transaction.State.TIME_OUT);
                });
                try {
                    create.newMessage(transaction).send();
                    Assert.fail();
                } catch (Exception e) {
                    Assert.assertTrue(e.getCause().getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
                }
                try {
                    subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction).get();
                    Assert.fail();
                } catch (Exception e2) {
                    Assert.assertTrue(e2.getCause() instanceof TransactionCoordinatorClientException.InvalidTxnStatusException);
                }
                create.close();
                subscribe.close();
                this.admin.topics().delete("tnx/ns1/testTxnTimeOutInClient", true);
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testCumulativeAckRedeliverMessages() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/testCumulativeAckRedeliverMessages"}).subscriptionName("test").subscribe();
        try {
            Producer create = this.pulsarClient.newProducer().topic("tnx/ns1/testCumulativeAckRedeliverMessages").sendTimeout(0, TimeUnit.SECONDS).create();
            for (int i = 0; i < 5; i++) {
                try {
                    create.send((i).getBytes(StandardCharsets.UTF_8));
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            }
            Transaction txn = getTxn();
            Transaction txn2 = getTxn();
            Message message = null;
            for (int i2 = 0; i2 < 3; i2++) {
                message = subscribe.receive(5, TimeUnit.SECONDS);
            }
            Assert.assertEquals((byte[]) message.getValue(), ((3 - 1)).getBytes(StandardCharsets.UTF_8));
            subscribe.acknowledgeCumulativeAsync(message.getMessageId(), txn).get();
            try {
                subscribe.acknowledgeCumulativeAsync(message.getMessageId(), txn2).get();
                Assert.fail();
            } catch (ExecutionException e) {
                Assert.assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
                txn.abort().get();
                subscribe.redeliverUnacknowledgedMessages();
            }
            for (int i3 = 0; i3 < 5; i3++) {
                message = subscribe.receive(5, TimeUnit.SECONDS);
            }
            Transaction txn3 = getTxn();
            Assert.assertEquals((byte[]) message.getValue(), ((5 - 1)).getBytes(StandardCharsets.UTF_8));
            subscribe.acknowledgeCumulativeAsync(message.getMessageId(), txn3).get();
            txn3.commit().get();
            Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
            create.close();
            subscribe.close();
            this.admin.topics().delete("tnx/ns1/testCumulativeAckRedeliverMessages", true);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } finally {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        }
    }

    @Test
    public void testSendTxnMessageTimeout() throws Exception {
        ProducerImpl create = this.pulsarClient.newProducer().topic("tnx/ns1/testSendTxnMessageTimeout").sendTimeout(1, TimeUnit.SECONDS).create();
        try {
            Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
            ClientCnx clientCnx = (ClientCnx) Mockito.mock(ClientCnx.class);
            ChannelHandlerContext channelHandlerContext = (ChannelHandlerContext) Mockito.mock(ChannelHandlerContext.class);
            ((ClientCnx) Mockito.doReturn(channelHandlerContext).when(clientCnx)).ctx();
            ((ChannelHandlerContext) Mockito.doReturn((EventExecutor) Mockito.mock(EventExecutor.class)).when(channelHandlerContext)).executor();
            CompletableFuture completableFuture = new CompletableFuture();
            completableFuture.complete(new ProducerResponse("test", 1L, "1".getBytes(), Optional.of(30L)));
            ((ClientCnx) Mockito.doReturn(completableFuture).when(clientCnx)).sendRequestWithId((ByteBuf) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
            create.getConnectionHandler().setClientCnx(clientCnx);
            try {
                create.newMessage(transaction).value("Hello Pulsar!".getBytes()).send();
                Assert.fail();
            } catch (PulsarClientException e) {
                Assert.assertTrue(e instanceof PulsarClientException.TimeoutException);
            }
            transaction.abort().get();
            create.close();
            this.admin.topics().delete("tnx/ns1/testSendTxnMessageTimeout", true);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception {
        ProducerImpl create = this.pulsarClient.newProducer().topic("persistent://tnx/ns1/testAckWithTransactionReduceUnackCountNotInPendingAcks").batchingMaxPublishDelay(1L, TimeUnit.SECONDS).sendTimeout(1, TimeUnit.SECONDS).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{"persistent://tnx/ns1/testAckWithTransactionReduceUnackCountNotInPendingAcks"}).subscriptionType(SubscriptionType.Shared).subscriptionName("test").subscribe();
            for (int i = 0; i < 5; i++) {
                try {
                    create.sendAsync((i).getBytes(StandardCharsets.UTF_8));
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                    throw th;
                }
            }
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < 5; i2++) {
                arrayList.add(subscribe.receive(5, TimeUnit.SECONDS).getMessageId());
            }
            MessageIdImpl messageIdImpl = (MessageIdImpl) arrayList.get(0);
            ((org.apache.pulsar.broker.service.Consumer) ((Topic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic("persistent://tnx/ns1/testAckWithTransactionReduceUnackCountNotInPendingAcks", false).get()).get()).getSubscription("test").getConsumers().get(0)).getPendingAcks().remove(messageIdImpl.ledgerId, messageIdImpl.entryId);
            Transaction txn = getTxn();
            subscribe.acknowledgeAsync((MessageId) arrayList.get(1), txn).get();
            Assert.assertEquals(((org.apache.pulsar.broker.service.Consumer) ((Topic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic("persistent://tnx/ns1/testAckWithTransactionReduceUnackCountNotInPendingAcks", false).get()).get()).getSubscription("test").getConsumers().get(0)).getUnackedMessages(), 4);
            txn.abort().get();
            subscribe.close();
            create.close();
            this.admin.topics().delete("persistent://tnx/ns1/testAckWithTransactionReduceUnackCountNotInPendingAcks", true);
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testSendTxnAckMessageToDLQ() throws Exception {
        ProducerImpl create = this.pulsarClient.newProducer().topic("tnx/ns1/testSendTxnAckMessageToDLQ").enableBatching(false).sendTimeout(1, TimeUnit.SECONDS).create();
        try {
            ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/testSendTxnAckMessageToDLQ"}).subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscriptionName("test").subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{String.format("%s-%s-DLQ", "tnx/ns1/testSendTxnAckMessageToDLQ", "test")}).subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscriptionName("test").subscribe();
                try {
                    create.send("test".getBytes());
                    Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get();
                    subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction).get();
                    transaction.abort().get();
                    Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
                    subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction2).get();
                    transaction2.abort().get();
                    Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                    Assert.assertEquals(subscribe.getAvailablePermits(), 3);
                    Assert.assertEquals("test", new String((byte[]) subscribe2.receive(5, TimeUnit.SECONDS).getValue()));
                    subscribe.close();
                    subscribe2.close();
                    create.close();
                    this.admin.topics().delete(String.format("%s-%s-DLQ", "tnx/ns1/testSendTxnAckMessageToDLQ", "test"), true);
                    this.admin.topics().delete("tnx/ns1/testSendTxnAckMessageToDLQ", true);
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testSendTxnAckBatchMessageToDLQ() throws Exception {
        ProducerImpl create = this.pulsarClient.newProducer().topic("tnx/ns1/testSendTxnAckBatchMessageToDLQ").sendTimeout(1, TimeUnit.SECONDS).create();
        try {
            ConsumerImpl subscribe = this.pulsarClient.newConsumer().topic(new String[]{"tnx/ns1/testSendTxnAckBatchMessageToDLQ"}).subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscriptionName("test").subscribe();
            try {
                Consumer subscribe2 = this.pulsarClient.newConsumer().topic(new String[]{String.format("%s-%s-DLQ", "tnx/ns1/testSendTxnAckBatchMessageToDLQ", "test")}).subscriptionType(SubscriptionType.Shared).deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build()).subscriptionName("test").subscribe();
                try {
                    create.sendAsync("test1".getBytes());
                    create.sendAsync("test2".getBytes());
                    Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(1L, TimeUnit.MINUTES).build().get();
                    Message receive = subscribe.receive(5, TimeUnit.SECONDS);
                    Assert.assertEquals("test1", new String((byte[]) receive.getValue()));
                    subscribe.acknowledgeAsync(receive.getMessageId(), transaction).get();
                    transaction.abort().get();
                    for (int i = 0; i < 3; i++) {
                        receive = subscribe.receive(5, TimeUnit.SECONDS);
                    }
                    Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.MINUTES).build().get();
                    Assert.assertEquals("test2", new String((byte[]) receive.getValue()));
                    subscribe.acknowledgeAsync(receive.getMessageId(), transaction2).get();
                    transaction2.abort().get();
                    Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                    Assert.assertEquals(subscribe.getAvailablePermits(), 6);
                    Assert.assertEquals("test1", new String((byte[]) subscribe2.receive(5, TimeUnit.SECONDS).getValue()));
                    Assert.assertEquals("test2", new String((byte[]) subscribe2.receive(5, TimeUnit.SECONDS).getValue()));
                    subscribe.close();
                    subscribe2.close();
                    create.close();
                    this.admin.topics().delete(String.format("%s-%s-DLQ", "tnx/ns1/testSendTxnAckBatchMessageToDLQ", "test"), true);
                    this.admin.topics().delete("tnx/ns1/testSendTxnAckBatchMessageToDLQ", true);
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(subscribe2).get(0) != null) {
                        subscribe2.close();
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th2;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testDelayedTransactionMessages() throws Exception {
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"tnx/ns1/testDelayedTransactionMessages"}).subscriptionName("failover-sub").subscriptionType(SubscriptionType.Failover).subscribe();
        try {
            subscribe = this.pulsarClient.newConsumer(Schema.STRING).topic(new String[]{"tnx/ns1/testDelayedTransactionMessages"}).subscriptionName("shared-sub").subscriptionType(SubscriptionType.Shared).subscribe();
            try {
                Producer create = this.pulsarClient.newProducer(Schema.STRING).topic("tnx/ns1/testDelayedTransactionMessages").enableBatching(false).create();
                try {
                    Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(10L, TimeUnit.SECONDS).build().get();
                    for (int i = 0; i < 10; i++) {
                        create.newMessage(transaction).value("msg-" + i).deliverAfter(5L, TimeUnit.SECONDS).sendAsync();
                    }
                    create.flush();
                    transaction.commit().get();
                    Assert.assertNull(subscribe.receive(5, TimeUnit.SECONDS));
                    for (int i2 = 0; i2 < 10; i2++) {
                        Assert.assertEquals((String) subscribe.receive(5, TimeUnit.SECONDS).getValue(), "msg-" + i2);
                    }
                    TreeSet treeSet = new TreeSet();
                    for (int i3 = 0; i3 < 10; i3++) {
                        treeSet.add((String) subscribe.receive(5, TimeUnit.SECONDS).getValue());
                    }
                    Assert.assertEquals(treeSet.size(), 10);
                    for (int i4 = 0; i4 < 10; i4++) {
                        Assert.assertTrue(treeSet.contains("msg-" + i4));
                    }
                    subscribe.close();
                    subscribe.close();
                    create.close();
                    this.admin.topics().delete("tnx/ns1/testDelayedTransactionMessages", true);
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    if (Collections.singletonList(subscribe).get(0) != null) {
                        subscribe.close();
                    }
                } catch (Throwable th) {
                    if (Collections.singletonList(create).get(0) != null) {
                        create.close();
                    }
                    throw th;
                }
            } finally {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(subscribe).get(0) != null) {
                subscribe.close();
            }
            throw th2;
        }
    }
}
