package org.apache.pulsar.broker.transaction.buffer;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBuffer;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/buffer/TransactionLowWaterMarkTest.class */
public class TransactionLowWaterMarkTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(TransactionLowWaterMarkTest.class);
    private static final String TOPIC = "persistent://tnx/ns1/test-topic";

    @BeforeMethod(alwaysRun = true)
    protected void setup() throws Exception {
        setUpBase(1, 16, TOPIC, 0);
        Map stores = getPulsarServiceList().get(0).getTransactionMetadataStoreService().getStores();
        Awaitility.await().until(() -> {
            if (stores.size() != 16) {
                return false;
            }
            Iterator it = stores.keySet().iterator();
            while (it.hasNext()) {
                if (((MLTransactionMetadataStore) stores.get((TransactionCoordinatorID) it.next())).getState() != TransactionMetadataStoreState.State.Ready) {
                    return false;
                }
            }
            return true;
        });
    }

    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testTransactionBufferLowWaterMark() throws Exception {
        TransactionImpl transactionImpl = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Producer create = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Failover).subscribe();
            try {
                create.newMessage(transactionImpl).value("test1".getBytes()).send();
                transactionImpl.commit().get();
                Assert.assertEquals(new String(subscribe.receive(2, TimeUnit.SECONDS).getData()), "test1");
                Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
                Field declaredField = TransactionImpl.class.getDeclaredField("state");
                declaredField.setAccessible(true);
                declaredField.set(transactionImpl, Transaction.State.OPEN);
                create.newMessage(transactionImpl).value("test2".getBytes()).send();
                try {
                    transactionImpl.commit().get();
                    Assert.fail("The commit operation should be failed.");
                } catch (Exception e) {
                    Assert.assertTrue(e.getCause() instanceof TransactionCoordinatorClientException.TransactionNotFoundException);
                }
                PartitionedTopicMetadata partitionedTopicMetadata = (PartitionedTopicMetadata) this.pulsarClient.getLookup().getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
                TransactionImpl transactionImpl2 = null;
                for (int i = 0; i < partitionedTopicMetadata.partitions; i++) {
                    transactionImpl2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                    if (transactionImpl2.getTxnIdMostBits() == transactionImpl.getTxnIdMostBits()) {
                        break;
                    }
                }
                if (transactionImpl2 == null || transactionImpl2.getTxnIdMostBits() != transactionImpl.getTxnIdMostBits()) {
                    Assert.fail();
                } else {
                    create.newMessage(transactionImpl2).value("test3".getBytes()).send();
                    Assert.assertNull(subscribe.receive(2, TimeUnit.SECONDS));
                    transactionImpl2.commit().get();
                    Assert.assertEquals(new String(subscribe.receive().getData()), "test3");
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    /* JADX WARN: Finally extract failed */
    @Test
    public void testPendingAckLowWaterMark() throws Exception {
        TransactionImpl transactionImpl = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
        Producer create = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).enableBatchIndexAcknowledgment(true).subscriptionType(SubscriptionType.Failover).subscribe();
            try {
                create.send("test1".getBytes());
                create.send("test2".getBytes());
                create.send("test3".getBytes());
                Message receive = subscribe.receive(2, TimeUnit.SECONDS);
                Assert.assertEquals(new String(receive.getData()), "test1");
                subscribe.acknowledgeAsync(receive.getMessageId(), transactionImpl).get();
                LinkedMap linkedMap = null;
                for (int i = 0; i < getPulsarServiceList().size(); i++) {
                    Field declaredField = BrokerService.class.getDeclaredField("topics");
                    declaredField.setAccessible(true);
                    CompletableFuture completableFuture = (CompletableFuture) ((ConcurrentOpenHashMap) declaredField.get(getPulsarServiceList().get(i).getBrokerService())).get(TOPIC);
                    if (completableFuture != null) {
                        Optional optional = (Optional) completableFuture.get();
                        if (optional.isPresent()) {
                            PersistentSubscription subscription = ((Topic) optional.get()).getSubscription("test");
                            Field declaredField2 = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
                            declaredField2.setAccessible(true);
                            PendingAckHandleImpl pendingAckHandleImpl = (PendingAckHandleImpl) declaredField2.get(subscription);
                            Field declaredField3 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
                            declaredField3.setAccessible(true);
                            linkedMap = (LinkedMap) declaredField3.get(pendingAckHandleImpl);
                        }
                    }
                }
                Assert.assertTrue(linkedMap.containsKey(new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits())));
                transactionImpl.commit().get();
                Field declaredField4 = TransactionImpl.class.getDeclaredField("state");
                declaredField4.setAccessible(true);
                declaredField4.set(transactionImpl, Transaction.State.OPEN);
                Assert.assertFalse(linkedMap.containsKey(new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits())));
                Message receive2 = subscribe.receive();
                Assert.assertEquals(new String(receive2.getData()), "test2");
                subscribe.acknowledgeAsync(receive2.getMessageId(), transactionImpl).get();
                Assert.assertTrue(linkedMap.containsKey(new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits())));
                PartitionedTopicMetadata partitionedTopicMetadata = (PartitionedTopicMetadata) this.pulsarClient.getLookup().getPartitionedTopicMetadata(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN).get();
                TransactionImpl transactionImpl2 = null;
                for (int i2 = 0; i2 < partitionedTopicMetadata.partitions; i2++) {
                    transactionImpl2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(5L, TimeUnit.SECONDS).build().get();
                    if (transactionImpl2.getTxnIdMostBits() == transactionImpl.getTxnIdMostBits()) {
                        break;
                    }
                }
                if (transactionImpl2 == null || transactionImpl2.getTxnIdMostBits() != transactionImpl.getTxnIdMostBits()) {
                    Assert.fail();
                } else {
                    create.newMessage(transactionImpl2).value("test3".getBytes()).send();
                    Message receive3 = subscribe.receive(2, TimeUnit.SECONDS);
                    Assert.assertEquals(new String(receive3.getData()), "test3");
                    subscribe.acknowledgeAsync(receive3.getMessageId(), transactionImpl2).get();
                    Assert.assertTrue(linkedMap.containsKey(new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits())));
                    Assert.assertTrue(linkedMap.containsKey(new TxnID(transactionImpl2.getTxnIdMostBits(), transactionImpl2.getTxnIdLeastBits())));
                    transactionImpl2.commit().get();
                    Assert.assertFalse(linkedMap.containsKey(new TxnID(transactionImpl.getTxnIdMostBits(), transactionImpl.getTxnIdLeastBits())));
                    Assert.assertFalse(linkedMap.containsKey(new TxnID(transactionImpl2.getTxnIdMostBits(), transactionImpl2.getTxnIdLeastBits())));
                }
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test
    public void testTBLowWaterMarkEndToEnd() throws Exception {
        Transaction transaction;
        Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
        Object obj = this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
        while (true) {
            transaction = (Transaction) obj;
            if (transaction.getTxnID().getMostSigBits() == transaction2.getTxnID().getMostSigBits()) {
                break;
            } else {
                obj = this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
            }
        }
        Producer create = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();
        try {
            create.newMessage(transaction2).send();
            create.newMessage(transaction).send();
            transaction2.commit().get();
            transaction.commit().get();
            Field declaredField = TransactionImpl.class.getDeclaredField("state");
            declaredField.setAccessible(true);
            declaredField.set(transaction2, Transaction.State.OPEN);
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get(TOPIC).toString(), false).get()).orElseThrow();
            try {
                create.newMessage(transaction2).send();
                Assert.fail();
            } catch (PulsarClientException.NotAllowedException e) {
            }
            Assert.assertEquals(persistentTopic.getPendingWriteOps().get(), 0L);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th;
        }
    }

    @Test
    public void testLowWaterMarkForDifferentTC() throws Exception {
        String str = "sub";
        Producer create = this.pulsarClient.newProducer().topic(TOPIC).sendTimeout(0, TimeUnit.SECONDS).create();
        try {
            Consumer subscribe = this.pulsarClient.newConsumer().topic(new String[]{TOPIC}).subscriptionName("sub").subscribe();
            try {
                Transaction transaction = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                Transaction transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                while (transaction2.getTxnID().getMostSigBits() == transaction.getTxnID().getMostSigBits()) {
                    transaction2 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                }
                Transaction transaction3 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                while (transaction3.getTxnID().getMostSigBits() != transaction2.getTxnID().getMostSigBits()) {
                    transaction3 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                }
                Transaction transaction4 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                while (transaction4.getTxnID().getMostSigBits() != transaction.getTxnID().getMostSigBits()) {
                    transaction4 = (Transaction) this.pulsarClient.newTransaction().withTransactionTimeout(500L, TimeUnit.SECONDS).build().get();
                }
                for (int i = 0; i < 10; i++) {
                    create.newMessage().send();
                }
                create.newMessage(transaction).send();
                create.newMessage(transaction2).send();
                create.newMessage(transaction3).send();
                create.newMessage(transaction4).send();
                subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction);
                subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction2);
                subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction3);
                subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction4);
                transaction.commit().get();
                transaction2.commit().get();
                Field declaredField = TransactionImpl.class.getDeclaredField("state");
                declaredField.setAccessible(true);
                declaredField.set(transaction, Transaction.State.OPEN);
                declaredField.set(transaction2, Transaction.State.OPEN);
                create.newMessage(transaction).send();
                create.newMessage(transaction2).send();
                subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction);
                subscribe.acknowledgeAsync(subscribe.receive(5, TimeUnit.SECONDS).getMessageId(), transaction2);
                transaction3.commit().get();
                TxnID txnID = transaction.getTxnID();
                TxnID txnID2 = transaction2.getTxnID();
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertTrue(checkTxnIsOngoingInTP(txnID, str));
                    Assert.assertTrue(checkTxnIsOngoingInTP(txnID2, str));
                    Assert.assertTrue(checkTxnIsOngoingInTB(txnID));
                    Assert.assertTrue(checkTxnIsOngoingInTB(txnID2));
                });
                transaction4.commit().get();
                Awaitility.await().untilAsserted(() -> {
                    Assert.assertFalse(checkTxnIsOngoingInTP(txnID, str));
                    Assert.assertFalse(checkTxnIsOngoingInTP(txnID2, str));
                    Assert.assertFalse(checkTxnIsOngoingInTB(txnID));
                    Assert.assertFalse(checkTxnIsOngoingInTB(txnID2));
                });
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(subscribe).get(0) != null) {
                    subscribe.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    private boolean checkTxnIsOngoingInTP(TxnID txnID, String str) throws Exception {
        PersistentSubscription subscription = ((PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get(TOPIC).toString(), false).get()).get()).getSubscription(str);
        Field declaredField = PersistentSubscription.class.getDeclaredField("pendingAckHandle");
        declaredField.setAccessible(true);
        PendingAckHandleImpl pendingAckHandleImpl = (PendingAckHandleImpl) declaredField.get(subscription);
        Field declaredField2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
        declaredField2.setAccessible(true);
        return ((LinkedMap) declaredField2.get(pendingAckHandleImpl)).containsKey(txnID);
    }

    private boolean checkTxnIsOngoingInTB(TxnID txnID) throws Exception {
        TopicTransactionBuffer transactionBuffer = ((PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic(TopicName.get(TOPIC).toString(), false).get()).get()).getTransactionBuffer();
        Field declaredField = TopicTransactionBuffer.class.getDeclaredField("ongoingTxns");
        declaredField.setAccessible(true);
        return ((LinkedMap) declaredField.get(transactionBuffer)).containsKey(txnID);
    }
}
