package org.apache.pulsar.broker.service.persistent;

import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javassist.bytecode.Opcode;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/TopicDuplicationTest.class */
public class TopicDuplicationTest extends ProducerConsumerBase {
    private final String testTenant = "my-property";
    private final String testNamespace = "my-ns";
    private final String myNamespace = "my-property/my-ns";
    private final String testTopic = "persistent://my-property/my-ns/max-unacked-";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    protected void setup() throws Exception {
        resetConfig();
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setBrokerDeduplicationEnabled(true);
        super.internalSetup();
        super.producerBaseSetup();
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    protected void cleanup() throws Exception {
        super.internalCleanup();
    }

    @Test(timeOut = 10000)
    public void testDuplicationApi() throws Exception {
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(str, 3);
        waitCacheInit(str);
        Assert.assertNull(this.admin.topics().getDeduplicationEnabled(str));
        this.admin.topics().enableDeduplication(str, true);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.admin.topics().getDeduplicationEnabled(str) != null);
        });
        Assert.assertEquals(this.admin.topics().getDeduplicationEnabled(str), true);
        this.admin.topics().disableDeduplication(str);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.admin.topics().getMaxUnackedMessagesOnSubscription(str) == null);
        });
        Assert.assertNull(this.admin.topics().getDeduplicationEnabled(str));
    }

    @Test(timeOut = 10000)
    public void testDuplicationSnapshotApi() throws Exception {
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic(str, 3);
        waitCacheInit(str);
        Assert.assertNull(this.admin.topics().getDeduplicationSnapshotInterval(str));
        this.admin.topics().setDeduplicationSnapshotInterval(str, 1024);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.admin.topics().getDeduplicationSnapshotInterval(str) != null);
        });
        Assert.assertEquals(this.admin.topics().getDeduplicationSnapshotInterval(str).intValue(), 1024L);
        this.admin.topics().removeDeduplicationSnapshotInterval(str);
        Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.admin.topics().getDeduplicationSnapshotInterval(str) == null);
        });
        Assert.assertNull(this.admin.topics().getDeduplicationSnapshotInterval(str));
    }

    @Test(timeOut = 30000)
    public void testTopicPolicyTakeSnapshot() throws Exception {
        resetConfig();
        this.conf.setSystemTopicEnabled(true);
        this.conf.setTopicLevelPoliciesEnabled(true);
        this.conf.setBrokerDeduplicationEnabled(true);
        this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
        this.conf.setBrokerDeduplicationSnapshotIntervalSeconds(7);
        this.conf.setBrokerDeduplicationEntriesInterval(20000);
        super.internalCleanup();
        super.internalSetup();
        super.producerBaseSetup();
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).producerName("my-producer").create();
        try {
            waitCacheInit(str);
            this.admin.topics().setDeduplicationSnapshotInterval(str, 3);
            this.admin.namespaces().setDeduplicationSnapshotInterval("my-property/my-ns", 5);
            int i = 10;
            CountDownLatch countDownLatch = new CountDownLatch(10);
            for (int i2 = 0; i2 < 10; i2++) {
                create.newMessage().value("msg" + i2).sendAsync().whenComplete((messageId, th) -> {
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
            long longValue = ((Long) persistentTopic.getMessageDeduplication().highestSequencedPersisted.get("my-producer")).longValue();
            PositionImpl lastConfirmedEntry = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertEquals(longValue, 10 - 1);
            org.testng.Assert.assertEquals(lastConfirmedEntry.getEntryId(), 10 - 1);
            Awaitility.await().until(() -> {
                return Boolean.valueOf(persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition().getEntryId() == ((long) (i - 1)));
            });
            ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
            org.testng.Assert.assertEquals(lastConfirmedEntry, managedCursor.getMarkDeletedPosition());
            this.admin.topics().removeDeduplicationSnapshotInterval(str);
            create.newMessage().value("msg").send();
            Awaitility.await().until(() -> {
                return Boolean.valueOf(persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition().getEntryId() == ((long) i));
            });
            PositionImpl markDeletedPosition = managedCursor.getMarkDeletedPosition();
            PositionImpl lastConfirmedEntry2 = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertEquals(10, markDeletedPosition.getEntryId());
            org.testng.Assert.assertEquals(lastConfirmedEntry2, markDeletedPosition);
            this.admin.namespaces().removeDeduplicationSnapshotInterval("my-property/my-ns");
            Awaitility.await().until(() -> {
                return Boolean.valueOf(this.admin.namespaces().getDeduplicationSnapshotInterval("my-property/my-ns") == null);
            });
            create.newMessage().value("msg").send();
            Thread.sleep(3000L);
            PositionImpl markDeletedPosition2 = managedCursor.getMarkDeletedPosition();
            PositionImpl lastConfirmedEntry3 = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertNotEquals(Integer.valueOf(10 + 1), Long.valueOf(markDeletedPosition2.getEntryId()));
            org.testng.Assert.assertNotEquals(lastConfirmedEntry3, markDeletedPosition2);
            Awaitility.await().until(() -> {
                return Boolean.valueOf(persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition().getEntryId() == ((long) (i + 1)));
            });
            PositionImpl markDeletedPosition3 = managedCursor.getMarkDeletedPosition();
            PositionImpl lastConfirmedEntry4 = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertEquals(10 + 1, markDeletedPosition3.getEntryId());
            org.testng.Assert.assertEquals(lastConfirmedEntry4, markDeletedPosition3);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th2;
        }
    }

    @Test(timeOut = 20000)
    public void testDuplicationMethod() throws Exception {
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        this.admin.topics().createPartitionedTopic("persistent://my-property/my-ns/max-unacked-", 3);
        waitCacheInit(str);
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).producerName("my-producer").create();
        try {
            long currentTimeMillis = System.currentTimeMillis();
            for (int i = 0; i <= 100; i++) {
                create.newMessage().value("msg-" + i).sequenceId(currentTimeMillis + i).send();
            }
            long j = currentTimeMillis + 100;
            MessageDeduplication messageDeduplication = ((Topic) ((Optional) ((CompletableFuture) this.pulsar.getBrokerService().getTopics().get(str)).get(1L, TimeUnit.SECONDS)).get()).getMessageDeduplication();
            messageDeduplication.checkStatus().whenComplete((r8, th) -> {
                if (th != null) {
                    Assert.fail("should not fail");
                }
                org.testng.Assert.assertNotNull(messageDeduplication.highestSequencedPersisted);
                org.testng.Assert.assertNotNull(messageDeduplication.highestSequencedPushed);
                org.testng.Assert.assertEquals(messageDeduplication.getLastPublishedSequenceId("my-producer"), j);
                org.testng.Assert.assertEquals(((Long) messageDeduplication.highestSequencedPersisted.get("my-producer")).longValue(), j);
                org.testng.Assert.assertEquals(((Long) messageDeduplication.highestSequencedPushed.get("my-producer")).longValue(), j);
            }).get();
            this.admin.topics().enableDeduplication(str, false);
            Awaitility.await().atMost(5L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(this.admin.topics().getDeduplicationEnabled(str) != null);
            });
            for (int i2 = 0; i2 < 100; i2++) {
                create.newMessage().value("msg-" + i2).sequenceId(j + i2).send();
            }
            messageDeduplication.checkStatus().whenComplete((r6, th2) -> {
                if (th2 != null) {
                    Assert.fail("should not fail");
                }
                org.testng.Assert.assertEquals(messageDeduplication.getLastPublishedSequenceId("my-producer"), -1L);
                org.testng.Assert.assertEquals(messageDeduplication.highestSequencedPersisted.size(), 0L);
                org.testng.Assert.assertEquals(messageDeduplication.highestSequencedPushed.size(), 0L);
            }).get();
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th3) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th3;
        }
    }

    @Test(timeOut = 40000)
    public void testDuplicationSnapshot() throws Exception {
        testTakeSnapshot(true);
        testTakeSnapshot(false);
    }

    private void testTakeSnapshot(boolean z) throws Exception {
        resetConfig();
        this.conf.setBrokerDeduplicationEnabled(true);
        this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(z ? 1 : 0);
        this.conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
        this.conf.setBrokerDeduplicationEntriesInterval(20000);
        super.internalCleanup();
        super.internalSetup();
        super.producerBaseSetup();
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).producerName("my-producer").create();
        try {
            CountDownLatch countDownLatch = new CountDownLatch(50);
            for (int i = 0; i < 50; i++) {
                create.newMessage().value("msg" + i).sendAsync().whenComplete((messageId, th) -> {
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
            long longValue = ((Long) persistentTopic.getMessageDeduplication().highestSequencedPersisted.get("my-producer")).longValue();
            PositionImpl lastConfirmedEntry = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertEquals(longValue, 50 - 1);
            org.testng.Assert.assertEquals(lastConfirmedEntry.getEntryId(), 50 - 1);
            Thread.sleep(2000L);
            ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
            PositionImpl markDeletedPosition = managedCursor.getMarkDeletedPosition();
            if (z) {
                org.testng.Assert.assertEquals(lastConfirmedEntry, markDeletedPosition);
            } else {
                org.testng.Assert.assertNotEquals(lastConfirmedEntry, markDeletedPosition);
                org.testng.Assert.assertNotEquals(Long.valueOf(markDeletedPosition.getEntryId()), -1);
            }
            create.newMessage().value("msg").send();
            PositionImpl markDeletedPosition2 = managedCursor.getMarkDeletedPosition();
            PositionImpl lastConfirmedEntry2 = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertNotEquals(50, Long.valueOf(markDeletedPosition2.getEntryId()));
            org.testng.Assert.assertNotNull(lastConfirmedEntry2);
            Thread.sleep(2000L);
            PositionImpl markDeletedPosition3 = managedCursor.getMarkDeletedPosition();
            PositionImpl lastConfirmedEntry3 = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            if (z) {
                org.testng.Assert.assertEquals(50, markDeletedPosition3.getEntryId());
                org.testng.Assert.assertEquals(lastConfirmedEntry3, markDeletedPosition3);
            } else {
                org.testng.Assert.assertNotEquals(50, Long.valueOf(markDeletedPosition3.getEntryId()));
                org.testng.Assert.assertNotEquals(lastConfirmedEntry3, markDeletedPosition3);
            }
        } finally {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        }
    }

    @Test(timeOut = 30000)
    public void testNamespacePolicyApi() throws Exception {
        Assert.assertNull(this.admin.namespaces().getDeduplicationSnapshotInterval("my-property/my-ns"));
        this.admin.namespaces().setDeduplicationSnapshotInterval("my-property/my-ns", 100);
        org.testng.Assert.assertEquals(this.admin.namespaces().getDeduplicationSnapshotInterval("my-property/my-ns").intValue(), 100);
        this.admin.namespaces().removeDeduplicationSnapshotInterval("my-property/my-ns");
        Assert.assertNull(this.admin.namespaces().getDeduplicationSnapshotInterval("my-property/my-ns"));
        this.admin.namespaces().setDeduplicationSnapshotIntervalAsync("my-property/my-ns", Integer.valueOf(Opcode.GOTO_W)).get();
        org.testng.Assert.assertEquals(((Integer) this.admin.namespaces().getDeduplicationSnapshotIntervalAsync("my-property/my-ns").get()).intValue(), Opcode.GOTO_W);
        this.admin.namespaces().removeDeduplicationSnapshotIntervalAsync("my-property/my-ns").get();
        Assert.assertNull((Integer) this.admin.namespaces().getDeduplicationSnapshotIntervalAsync("my-property/my-ns").get());
    }

    @Test(timeOut = 30000)
    private void testNamespacePolicyTakeSnapshot() throws Exception {
        resetConfig();
        this.conf.setBrokerDeduplicationEnabled(true);
        this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
        this.conf.setBrokerDeduplicationSnapshotIntervalSeconds(3);
        this.conf.setBrokerDeduplicationEntriesInterval(20000);
        super.internalCleanup();
        super.internalSetup();
        super.producerBaseSetup();
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).producerName("my-producer").create();
        try {
            this.admin.namespaces().setDeduplicationSnapshotInterval("my-property/my-ns", 1);
            int i = 50;
            CountDownLatch countDownLatch = new CountDownLatch(50);
            for (int i2 = 0; i2 < 50; i2++) {
                create.newMessage().value("msg" + i2).sendAsync().whenComplete((messageId, th) -> {
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
            long longValue = ((Long) persistentTopic.getMessageDeduplication().highestSequencedPersisted.get("my-producer")).longValue();
            PositionImpl lastConfirmedEntry = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertEquals(longValue, 50 - 1);
            org.testng.Assert.assertEquals(lastConfirmedEntry.getEntryId(), 50 - 1);
            Awaitility.await().atMost(2500L, TimeUnit.MILLISECONDS).until(() -> {
                return Boolean.valueOf(persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition().getEntryId() == ((long) (i - 1)));
            });
            ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
            org.testng.Assert.assertEquals(lastConfirmedEntry, managedCursor.getMarkDeletedPosition());
            this.admin.namespaces().removeDeduplicationSnapshotInterval("my-property/my-ns");
            Thread.sleep(2000L);
            PositionImpl markDeletedPosition = managedCursor.getMarkDeletedPosition();
            PositionImpl lastConfirmedEntry2 = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertNotEquals(Integer.valueOf(50 - 1), Long.valueOf(markDeletedPosition.getEntryId()));
            org.testng.Assert.assertNotEquals(lastConfirmedEntry2, Long.valueOf(markDeletedPosition.getEntryId()));
            Thread.sleep(1000L);
            PositionImpl markDeletedPosition2 = managedCursor.getMarkDeletedPosition();
            PositionImpl lastConfirmedEntry3 = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertEquals(50 - 1, markDeletedPosition2.getEntryId());
            org.testng.Assert.assertEquals(lastConfirmedEntry3, markDeletedPosition2);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th2;
        }
    }

    @Test(timeOut = 30000)
    private void testDisableNamespacePolicyTakeSnapshot() throws Exception {
        resetConfig();
        this.conf.setBrokerDeduplicationEnabled(true);
        this.conf.setBrokerDeduplicationSnapshotFrequencyInSeconds(1);
        this.conf.setBrokerDeduplicationSnapshotIntervalSeconds(1);
        this.conf.setBrokerDeduplicationEntriesInterval(20000);
        super.internalCleanup();
        super.internalSetup();
        super.producerBaseSetup();
        String str = "persistent://my-property/my-ns/max-unacked-" + UUID.randomUUID().toString();
        Producer create = this.pulsarClient.newProducer(Schema.STRING).topic(str).enableBatching(false).producerName("my-producer").create();
        try {
            this.admin.namespaces().setDeduplicationSnapshotInterval("my-property/my-ns", 0);
            CountDownLatch countDownLatch = new CountDownLatch(50);
            for (int i = 0; i < 50; i++) {
                create.newMessage().value("msg" + i).sendAsync().whenComplete((messageId, th) -> {
                    countDownLatch.countDown();
                });
            }
            countDownLatch.await();
            PersistentTopic persistentTopic = (PersistentTopic) ((Optional) this.pulsar.getBrokerService().getTopicIfExists(str).get()).get();
            ManagedCursor managedCursor = persistentTopic.getMessageDeduplication().getManagedCursor();
            PositionImpl markDeletedPosition = managedCursor.getMarkDeletedPosition();
            long longValue = ((Long) persistentTopic.getMessageDeduplication().highestSequencedPersisted.get("my-producer")).longValue();
            PositionImpl lastConfirmedEntry = persistentTopic.getMessageDeduplication().getManagedCursor().getManagedLedger().getLastConfirmedEntry();
            org.testng.Assert.assertEquals(longValue, 50 - 1);
            org.testng.Assert.assertEquals(lastConfirmedEntry.getEntryId(), 50 - 1);
            Awaitility.await().atMost(2L, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(persistentTopic.getMessageDeduplication().getManagedCursor().getMarkDeletedPosition().getEntryId() == -1);
            });
            org.testng.Assert.assertEquals(markDeletedPosition, managedCursor.getMarkDeletedPosition());
            org.testng.Assert.assertEquals(markDeletedPosition.getEntryId(), -1L);
            org.testng.Assert.assertNotEquals(lastConfirmedEntry, markDeletedPosition);
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
        } catch (Throwable th2) {
            if (Collections.singletonList(create).get(0) != null) {
                create.close();
            }
            throw th2;
        }
    }

    private void waitCacheInit(String str) throws Exception {
        this.pulsarClient.newConsumer().topic(str).subscriptionName("my-sub").subscribe().close();
        TopicName topicName = TopicName.get(str);
        Awaitility.await().atMost(10L, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(this.pulsar.getTopicPoliciesService().cacheIsInitialized(topicName));
        });
    }
}
