package org.apache.pulsar.broker.transaction.pendingack.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.class */
public class MLPendingAckStoreTest extends TransactionTestBase {
    private static final Logger log = LoggerFactory.getLogger(MLPendingAckStoreTest.class);
    private PersistentSubscription persistentSubscriptionMock;
    private ManagedCursor managedCursorMock;
    private ExecutorService internalPinnedExecutor;
    private int pendingAckLogIndexMinLag = 1;

    @BeforeMethod
    protected void setup() throws Exception {
        setUpBase(1, 1, "tnx/ns1/test", 0);
        this.admin.topics().createNonPartitionedTopic("tnx/ns1/test-txn-topic");
        PersistentTopic persistentTopic = (PersistentTopic) ((Optional) getPulsarServiceList().get(0).getBrokerService().getTopic("tnx/ns1/test-txn-topic", false).get()).get();
        getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(this.pendingAckLogIndexMinLag);
        PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic.createSubscription("test", CommandSubscribe.InitialPosition.Earliest, false, (Map) null).get();
        this.managedCursorMock = (ManagedCursor) Mockito.spy(persistentSubscription.getCursor());
        this.persistentSubscriptionMock = (PersistentSubscription) Mockito.spy(persistentSubscription);
        Mockito.when(this.persistentSubscriptionMock.getCursor()).thenReturn(this.managedCursorMock);
        this.internalPinnedExecutor = this.persistentSubscriptionMock.getTopic().getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this);
    }

    @AfterMethod
    public void cleanup() {
        super.internalCleanup();
    }

    private MLPendingAckStore createPendingAckStore(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) throws Exception {
        MLPendingAckStoreProvider mLPendingAckStoreProvider = new MLPendingAckStoreProvider();
        ServiceConfiguration configuration = this.persistentSubscriptionMock.getTopic().getBrokerService().getPulsar().getConfiguration();
        configuration.setTransactionPendingAckBatchedWriteMaxRecords(txnLogBufferedWriterConfig.getBatchedWriteMaxRecords());
        configuration.setTransactionPendingAckBatchedWriteMaxSize(txnLogBufferedWriterConfig.getBatchedWriteMaxSize());
        configuration.setTransactionPendingAckBatchedWriteMaxDelayInMillis(txnLogBufferedWriterConfig.getBatchedWriteMaxDelayInMillis());
        configuration.setTransactionPendingAckBatchedWriteEnabled(txnLogBufferedWriterConfig.isBatchEnabled());
        return (MLPendingAckStore) mLPendingAckStoreProvider.newPendingAckStore(this.persistentSubscriptionMock).get();
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Object[], java.lang.Object[][]] */
    @DataProvider(name = "mainProcessArgs")
    public Object[][] mainProcessArgsProvider() {
        return new Object[]{new Object[]{true, true}, new Object[]{false, false}, new Object[]{true, false}, new Object[]{false, true}};
    }

    @Test(dataProvider = "mainProcessArgs")
    public void testMainProcess(boolean z, boolean z2) throws Exception {
        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
        txnLogBufferedWriterConfig.setBatchEnabled(z);
        txnLogBufferedWriterConfig.setBatchedWriteMaxRecords(2);
        txnLogBufferedWriterConfig.setBatchedWriteMaxDelayInMillis(3600000);
        MLPendingAckStore createPendingAckStore = createPendingAckStore(txnLogBufferedWriterConfig);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 20; i++) {
            arrayList.add(createPendingAckStore.appendCumulativeAck(new TxnID(i, i), PositionImpl.get(i, i)));
        }
        for (int i2 = 0; i2 < 10; i2++) {
            arrayList.add(createPendingAckStore.appendCommitMark(new TxnID(i2, i2), CommandAck.AckType.Cumulative));
        }
        for (int i3 = 10; i3 < 20; i3++) {
            arrayList.add(createPendingAckStore.appendAbortMark(new TxnID(i3, i3), CommandAck.AckType.Cumulative));
        }
        for (int i4 = 40; i4 < 50; i4++) {
            arrayList.add(createPendingAckStore.appendCumulativeAck(new TxnID(i4, i4), PositionImpl.get(i4, i4)));
        }
        FutureUtil.waitForAll(arrayList).get();
        ArrayList arrayList2 = new ArrayList();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 50) {
                break;
            }
            arrayList2.add(Long.valueOf(j2));
            j = j2 + 1;
        }
        LinkedHashSet<Long> linkedHashSet = new LinkedHashSet<>();
        long j3 = 20;
        while (true) {
            long j4 = j3;
            if (j4 >= 40) {
                break;
            }
            linkedHashSet.add(Long.valueOf(j4));
            j3 = j4 + 1;
        }
        if (z) {
            long j5 = 0;
            while (true) {
                long j6 = j5;
                if (j6 >= 50) {
                    break;
                }
                if (j6 % 2 == 0) {
                    linkedHashSet.add(Long.valueOf(j6));
                }
                j5 = j6 + 1;
            }
        }
        Assert.assertEquals((Collection) createPendingAckStore.pendingAckLogIndex.keySet().stream().map((v0) -> {
            return v0.getEntryId();
        }).collect(Collectors.toList()), new ArrayList(calculatePendingAckIndexes(arrayList2, linkedHashSet)));
        TxnLogBufferedWriterConfig txnLogBufferedWriterConfig2 = new TxnLogBufferedWriterConfig();
        txnLogBufferedWriterConfig2.setBatchEnabled(z2);
        txnLogBufferedWriterConfig2.setBatchedWriteMaxRecords(2);
        txnLogBufferedWriterConfig2.setBatchedWriteMaxDelayInMillis(3600000);
        MLPendingAckStore createPendingAckStore2 = createPendingAckStore(txnLogBufferedWriterConfig2);
        PendingAckHandleImpl pendingAckHandleImpl = (PendingAckHandleImpl) Mockito.mock(PendingAckHandleImpl.class);
        Mockito.when(pendingAckHandleImpl.getInternalPinnedExecutor()).thenReturn(this.internalPinnedExecutor);
        Mockito.when(Boolean.valueOf(pendingAckHandleImpl.changeToReadyState())).thenReturn(true);
        final AtomicInteger atomicInteger = new AtomicInteger();
        ((PendingAckHandleImpl) Mockito.doAnswer(new Answer() { // from class: org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreTest.1
            public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                atomicInteger.incrementAndGet();
                return null;
            }
        }).when(pendingAckHandleImpl)).completeHandleFuture();
        createPendingAckStore2.replayAsync(pendingAckHandleImpl, this.internalPinnedExecutor);
        Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(atomicInteger.get() == 1);
        });
        Assert.assertEquals(createPendingAckStore2.pendingAckLogIndex.size(), createPendingAckStore.pendingAckLogIndex.size());
        Iterator it = createPendingAckStore.pendingAckLogIndex.entrySet().iterator();
        for (Map.Entry entry : createPendingAckStore2.pendingAckLogIndex.entrySet()) {
            Map.Entry entry2 = (Map.Entry) it.next();
            Assert.assertEquals(entry.getKey(), entry2.getKey());
            Assert.assertEquals(((PositionImpl) entry.getValue()).getLedgerId(), ((PositionImpl) entry2.getValue()).getLedgerId());
            Assert.assertEquals(((PositionImpl) entry.getValue()).getEntryId(), ((PositionImpl) entry2.getValue()).getEntryId());
        }
        Mockito.when(this.managedCursorMock.getPersistentMarkDeletedPosition()).thenReturn(PositionImpl.get(19L, 19L));
        createPendingAckStore.clearUselessLogData();
        createPendingAckStore2.clearUselessLogData();
        Assert.assertTrue(((PositionImpl) createPendingAckStore.pendingAckLogIndex.keySet().iterator().next()).getEntryId() > 19);
        Assert.assertTrue(((PositionImpl) createPendingAckStore2.pendingAckLogIndex.keySet().iterator().next()).getEntryId() > 19);
        createPendingAckStore.closeAsync().get();
        createPendingAckStore2.closeAsync().get();
    }

    private LinkedHashSet<Long> calculatePendingAckIndexes(List<Long> list, LinkedHashSet<Long> linkedHashSet) {
        LogIndexLagBackoff logIndexLagBackoff = new LogIndexLagBackoff(this.pendingAckLogIndexMinLag, Long.MAX_VALUE, 1.0d);
        long next = logIndexLagBackoff.next(0);
        long j = 0;
        LinkedHashSet<Long> linkedHashSet2 = new LinkedHashSet<>();
        for (int i = 0; i < list.size(); i++) {
            j++;
            long longValue = list.get(i).longValue();
            if (!linkedHashSet.contains(Long.valueOf(longValue)) && j >= next) {
                linkedHashSet2.add(Long.valueOf(longValue));
                next = logIndexLagBackoff.next(linkedHashSet2.size());
                j = 0;
            }
        }
        return linkedHashSet2;
    }
}
