package org.apache.pulsar.broker.service;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnedBundle;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.ReplicatorTestBase;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.ProducerConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.DestinationName;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

/* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorTest.class */
public class ReplicatorTest extends ReplicatorTestBase {
    private static final Logger log = LoggerFactory.getLogger(ReplicatorTest.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @BeforeClass
    public void setup() throws Exception {
        super.setup();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.pulsar.broker.service.ReplicatorTestBase
    @AfterClass
    public void shutdown() throws Exception {
        super.shutdown();
    }

    @Test(enabled = true)
    public void testConfigChange() throws Exception {
        log.info("--- Starting ReplicatorTest::testConfigChange ---");
        List newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            final DestinationName destinationName = DestinationName.get(String.format("persistent://pulsar/global/ns/topic-%d", Integer.valueOf(i)));
            newArrayList.add(this.executor.submit(new Callable<Void>() { // from class: org.apache.pulsar.broker.service.ReplicatorTest.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url1, destinationName);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url1);
                    ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url1, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url1);
                    messageProducer.produce(2);
                    messageConsumer.receive(2);
                    messageProducer.close();
                    messageConsumer.close();
                    return null;
                }
            }));
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (Exception e) {
                log.error("exception in getting future result ", e);
                Assert.fail(String.format("replication test failed with %s exception", e.getMessage()));
            }
        }
        Thread.sleep(1000L);
        ConcurrentOpenHashMap replicationClients = this.ns1.getReplicationClients();
        ConcurrentOpenHashMap replicationClients2 = this.ns2.getReplicationClients();
        ConcurrentOpenHashMap replicationClients3 = this.ns3.getReplicationClients();
        Assert.assertNotNull(replicationClients.get("r2"));
        Assert.assertNotNull(replicationClients.get("r3"));
        Assert.assertNotNull(replicationClients2.get("r1"));
        Assert.assertNotNull(replicationClients2.get("r3"));
        Assert.assertNotNull(replicationClients3.get("r1"));
        Assert.assertNotNull(replicationClients3.get("r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Lists.newArrayList(new String[]{"r1"}));
        Thread.sleep(1000L);
        Assert.assertNotNull(replicationClients.get("r2"));
        Assert.assertNotNull(replicationClients.get("r3"));
        Assert.assertNotNull(replicationClients2.get("r1"));
        Assert.assertNotNull(replicationClients2.get("r3"));
        Assert.assertNotNull(replicationClients3.get("r1"));
        Assert.assertNotNull(replicationClients3.get("r2"));
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Lists.newArrayList(new String[]{"r1", "r2", "r3"}));
        Thread.sleep(1000L);
        Assert.assertNotNull(replicationClients.get("r2"));
        Assert.assertNotNull(replicationClients.get("r3"));
        Assert.assertNotNull(replicationClients2.get("r1"));
        Assert.assertNotNull(replicationClients2.get("r3"));
        Assert.assertNotNull(replicationClients3.get("r1"));
        Assert.assertNotNull(replicationClients3.get("r2"));
    }

    @Test
    public void testConcurrentReplicator() throws Exception {
        log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---");
        DestinationName destinationName = DestinationName.get(String.format("persistent://pulsar/global/ns1/topic-%d", 0));
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setStatsInterval(0L, TimeUnit.SECONDS);
        PulsarClient.create(this.url1.toString(), clientConfiguration).createProducer(destinationName.toString()).close();
        PersistentTopic persistentTopic = (PersistentTopic) this.pulsar1.getBrokerService().getTopic(destinationName.toString()).get();
        PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) Mockito.spy(this.pulsar1.getBrokerService().getReplicationClient("r3"));
        Method declaredMethod = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
        declaredMethod.setAccessible(true);
        Field declaredField = BrokerService.class.getDeclaredField("replicationClients");
        declaredField.setAccessible(true);
        ((ConcurrentOpenHashMap) declaredField.get(this.pulsar1.getBrokerService())).put("r3", pulsarClientImpl);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            newFixedThreadPool.submit(() -> {
                try {
                    declaredMethod.invoke(persistentTopic, "r3");
                } catch (Exception e) {
                    Assert.fail("setting replicator failed", e);
                }
            });
        }
        Thread.sleep(3000L);
        ((PulsarClientImpl) Mockito.verify(pulsarClientImpl, Mockito.times(1))).createProducerAsync(Mockito.anyString(), (ProducerConfiguration) Mockito.anyObject());
    }

    @Test(enabled = false)
    public void testConfigChangeNegativeCases() throws Exception {
        log.info("--- Starting ReplicatorTest::testConfigChangeNegativeCases ---");
        Field declaredField = NamespaceService.class.getDeclaredField("ownershipCache");
        declaredField.setAccessible(true);
        OwnershipCache ownershipCache = (OwnershipCache) declaredField.get(this.pulsar1.getNamespaceService());
        Assert.assertNotNull(this.pulsar1, "pulsar1 is null");
        Assert.assertNotNull(this.pulsar1.getNamespaceService(), "pulsar1.getNamespaceService() is null");
        NamespaceBundle fullBundle = this.pulsar1.getNamespaceService().getNamespaceBundleFactory().getFullBundle(new NamespaceName("pulsar/global/ns"));
        ownershipCache.tryAcquiringOwnership(fullBundle);
        Assert.assertNotNull(ownershipCache.getOwnedBundle(fullBundle), "pulsar1.getNamespaceService().getOwnedServiceUnit(new NamespaceName(\"pulsar/global/ns\")) is null");
        OwnedBundle.class.getDeclaredField("isActive").setAccessible(true);
        ownershipCache.disableOwnership(fullBundle);
        try {
            this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Lists.newArrayList(new String[]{"r1"}));
            Assert.fail("Should have raised exception");
        } catch (PulsarAdminException.PreconditionFailedException e) {
        }
        ownershipCache.removeOwnership(fullBundle).get();
        ownershipCache.tryAcquiringOwnership(fullBundle);
    }

    @Test(enabled = true)
    public void testReplication() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplication ---");
        TreeSet treeSet = new TreeSet();
        List newArrayList = Lists.newArrayList();
        for (int i = 0; i < 3; i++) {
            final DestinationName destinationName = DestinationName.get(String.format("persistent://pulsar/global/ns/repltopic-%d", Integer.valueOf(i)));
            treeSet.add(destinationName.toString());
            newArrayList.add(this.executor.submit(new Callable<Void>() { // from class: org.apache.pulsar.broker.service.ReplicatorTest.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url1, destinationName);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url1);
                    ReplicatorTestBase.MessageProducer messageProducer2 = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url2, destinationName);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url2);
                    ReplicatorTestBase.MessageProducer messageProducer3 = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url3, destinationName);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url3);
                    ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url1, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url1);
                    ReplicatorTestBase.MessageConsumer messageConsumer2 = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url2, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url2);
                    ReplicatorTestBase.MessageConsumer messageConsumer3 = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url3, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url3);
                    messageProducer.produce(2);
                    messageConsumer.receive(2);
                    messageConsumer2.receive(2);
                    messageConsumer3.receive(2);
                    messageProducer2.produce(2);
                    messageConsumer.receive(2);
                    messageConsumer2.receive(2);
                    messageConsumer3.receive(2);
                    messageProducer3.produce(2);
                    messageConsumer.receive(2);
                    messageConsumer2.receive(2);
                    messageConsumer3.receive(2);
                    messageProducer.produce(1);
                    messageProducer2.produce(1);
                    messageConsumer.receive(1);
                    messageConsumer2.receive(1);
                    messageConsumer3.receive(1);
                    messageConsumer.receive(1);
                    messageConsumer2.receive(1);
                    messageConsumer3.receive(1);
                    messageProducer.close();
                    messageProducer2.close();
                    messageProducer3.close();
                    messageConsumer.close();
                    messageConsumer2.close();
                    messageConsumer3.close();
                    return null;
                }
            }));
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (Exception e) {
                log.error("exception in getting future result ", e);
                Assert.fail(String.format("replication test failed with %s exception", e.getMessage()));
            }
        }
    }

    @Test(enabled = false)
    public void testReplicationOverrides() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
        TreeSet treeSet = new TreeSet();
        List newArrayList = Lists.newArrayList();
        for (int i = 0; i < 10; i++) {
            final DestinationName destinationName = DestinationName.get(String.format("persistent://pulsar/global/ns/repltopic-%d", Integer.valueOf(i)));
            treeSet.add(destinationName.toString());
            newArrayList.add(this.executor.submit(new Callable<Void>() { // from class: org.apache.pulsar.broker.service.ReplicatorTest.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url1, destinationName);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url1);
                    ReplicatorTestBase.MessageProducer messageProducer2 = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url2, destinationName);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url2);
                    ReplicatorTestBase.MessageProducer messageProducer3 = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url3, destinationName);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url3);
                    ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url1, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url1);
                    ReplicatorTestBase.MessageConsumer messageConsumer2 = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url2, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url2);
                    ReplicatorTestBase.MessageConsumer messageConsumer3 = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url3, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url3);
                    messageProducer.produce(1, MessageBuilder.create().disableReplication());
                    messageProducer.produce(1, MessageBuilder.create().setReplicationClusters(Lists.newArrayList(new String[]{"r1", "r3"})));
                    messageProducer.produce(1);
                    messageConsumer.receive(3);
                    messageConsumer2.receive(1);
                    if (!messageConsumer2.drained()) {
                        throw new Exception("consumer2 - unexpected message in queue");
                    }
                    messageConsumer3.receive(2);
                    if (!messageConsumer3.drained()) {
                        throw new Exception("consumer3 - unexpected message in queue");
                    }
                    messageProducer.close();
                    messageProducer2.close();
                    messageProducer3.close();
                    messageConsumer.close();
                    messageConsumer2.close();
                    messageConsumer3.close();
                    return null;
                }
            }));
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (Exception e) {
                log.error("exception in getting future result ", e);
                Assert.fail(String.format("replication test failed with %s exception", e.getMessage()));
            }
        }
    }

    @Test(enabled = true)
    public void testFailures() throws Exception {
        log.info("--- Starting ReplicatorTest::testFailures ---");
        try {
            new ReplicatorTestBase.MessageConsumer(this.url2, DestinationName.get(String.format("persistent://pulsar/global/ns/res-cons-id", new Object[0])), "pulsar.repl.").close();
        } catch (Exception e) {
        }
    }

    @Test
    public void testReplicatePeekAndSkip() throws Exception {
        TreeSet treeSet = new TreeSet();
        DestinationName destinationName = DestinationName.get("persistent://pulsar/global/ns/peekAndSeekTopic");
        treeSet.add(destinationName.toString());
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, destinationName);
        ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url3, destinationName);
        messageProducer.produce(2);
        messageProducer.close();
        PersistentTopic topicReference = this.pulsar1.getBrokerService().getTopicReference(destinationName.toString());
        PersistentReplicator persistentReplicator = (PersistentReplicator) topicReference.getReplicators().get(topicReference.getReplicators().keys().get(0));
        persistentReplicator.skipMessages(2);
        Assert.assertNull((Entry) persistentReplicator.peekNthMessage(1).get(50L, TimeUnit.MILLISECONDS));
        messageConsumer.close();
    }

    @Test
    public void testReplicatorClearBacklog() throws Exception {
        TreeSet treeSet = new TreeSet();
        DestinationName destinationName = DestinationName.get("persistent://pulsar/global/ns/clearBacklogTopic");
        treeSet.add(destinationName.toString());
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, destinationName);
        ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url3, destinationName);
        messageProducer.produce(2);
        messageProducer.close();
        PersistentTopic topicReference = this.pulsar1.getBrokerService().getTopicReference(destinationName.toString());
        PersistentReplicator persistentReplicator = (PersistentReplicator) Mockito.spy(topicReference.getReplicators().get(topicReference.getReplicators().keys().get(0)));
        persistentReplicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), (Object) null);
        persistentReplicator.clearBacklog().get();
        Thread.sleep(100L);
        persistentReplicator.updateRates();
        persistentReplicator.expireMessages(1);
        Assert.assertTrue(persistentReplicator.getStats().replicationBacklog == 0);
        messageConsumer.close();
    }

    @Test(enabled = true)
    public void testResetCursorNotFail() throws Exception {
        log.info("--- Starting ReplicatorTest::testResetCursorNotFail ---");
        TreeSet treeSet = new TreeSet();
        List newArrayList = Lists.newArrayList();
        for (int i = 0; i < 1; i++) {
            final DestinationName destinationName = DestinationName.get(String.format("persistent://pulsar/global/ns/resetrepltopic-%d", Integer.valueOf(i)));
            treeSet.add(destinationName.toString());
            newArrayList.add(this.executor.submit(new Callable<Void>() { // from class: org.apache.pulsar.broker.service.ReplicatorTest.4
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url1, destinationName);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url1);
                    ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url1, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url1);
                    messageProducer.produce(2);
                    messageConsumer.receive(2);
                    messageProducer.close();
                    messageConsumer.close();
                    return null;
                }
            }));
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get();
            } catch (Exception e) {
                log.error("exception in getting future result ", e);
                Assert.fail(String.format("replication test failed with %s exception", e.getMessage()));
            }
        }
        this.admin1.persistentTopics().resetCursor((String) treeSet.first(), "sub-id", System.currentTimeMillis());
    }

    @Test(enabled = true)
    public void testReplicationForBatchMessages() throws Exception {
        log.info("--- Starting ReplicatorTest::testReplicationForBatchMessages ---");
        TreeSet treeSet = new TreeSet();
        List newArrayList = Lists.newArrayList();
        for (int i = 0; i < 3; i++) {
            final DestinationName destinationName = DestinationName.get(String.format("persistent://pulsar/global/ns/repltopicbatch-%d", Integer.valueOf(i)));
            treeSet.add(destinationName.toString());
            newArrayList.add(this.executor.submit(new Callable<Void>() { // from class: org.apache.pulsar.broker.service.ReplicatorTest.5
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url1, destinationName, true);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url1);
                    ReplicatorTestBase.MessageProducer messageProducer2 = new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url2, destinationName, true);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url2);
                    new ReplicatorTestBase.MessageProducer(ReplicatorTest.this.url3, destinationName, true);
                    ReplicatorTest.log.info("--- Starting producer --- " + ReplicatorTest.this.url3);
                    ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url1, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url1);
                    ReplicatorTestBase.MessageConsumer messageConsumer2 = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url2, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url2);
                    ReplicatorTestBase.MessageConsumer messageConsumer3 = new ReplicatorTestBase.MessageConsumer(ReplicatorTest.this.url3, destinationName);
                    ReplicatorTest.log.info("--- Starting Consumer --- " + ReplicatorTest.this.url3);
                    messageProducer.produceBatch(10);
                    messageConsumer.receive(10);
                    messageConsumer2.receive(10);
                    messageConsumer3.receive(10);
                    messageProducer2.produceBatch(10);
                    messageConsumer.receive(10);
                    messageConsumer2.receive(10);
                    messageConsumer3.receive(10);
                    return null;
                }
            }));
        }
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            try {
                ((Future) it.next()).get(5L, TimeUnit.SECONDS);
            } catch (Exception e) {
                log.error("exception in getting future result ", e);
                Assert.fail(String.format("replication test failed with %s exception", e.getMessage()));
            }
        }
    }

    @Test
    public void testDeleteReplicatorFailure() throws Exception {
        log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
        new ReplicatorTestBase.MessageProducer(this.url1, DestinationName.get("persistent://pulsar/global/ns/repltopicbatch"));
        PersistentTopic topicReference = this.pulsar1.getBrokerService().getTopicReference("persistent://pulsar/global/ns/repltopicbatch");
        String str = (String) topicReference.getReplicators().keys().get(0);
        ManagedLedgerImpl managedLedger = topicReference.getManagedLedger();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        managedLedger.asyncDeleteCursor("pulsar.repl." + str, new AsyncCallbacks.DeleteCursorCallback() { // from class: org.apache.pulsar.broker.service.ReplicatorTest.6
            public void deleteCursorComplete(Object obj) {
                countDownLatch.countDown();
            }

            public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                countDownLatch.countDown();
            }
        }, (Object) null);
        countDownLatch.await();
        Method declaredMethod = PersistentTopic.class.getDeclaredMethod("removeReplicator", String.class);
        declaredMethod.setAccessible(true);
        ((CompletableFuture) declaredMethod.invoke(topicReference, str)).thenApply(r5 -> {
            Assert.assertNull(topicReference.getPersistentReplicator(str));
            return null;
        });
    }

    @Test(priority = 5)
    public void testReplicatorProducerClosing() throws Exception {
        log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
        new ReplicatorTestBase.MessageProducer(this.url1, DestinationName.get("persistent://pulsar/global/ns/repltopicbatch"));
        PersistentTopic topicReference = this.pulsar1.getBrokerService().getTopicReference("persistent://pulsar/global/ns/repltopicbatch");
        Replicator persistentReplicator = topicReference.getPersistentReplicator((String) topicReference.getReplicators().keys().get(0));
        this.pulsar2.close();
        this.pulsar3.close();
        persistentReplicator.disconnect(false);
        Thread.sleep(100L);
        Field declaredField = AbstractReplicator.class.getDeclaredField("producer");
        declaredField.setAccessible(true);
        Assert.assertNull((ProducerImpl) declaredField.get(persistentReplicator));
    }

    @Test(timeOut = 60000, enabled = true, priority = -1)
    public void testResumptionAfterBacklogRelaxed() throws Exception {
        List<BacklogQuota.RetentionPolicy> newArrayList = Lists.newArrayList();
        newArrayList.add(BacklogQuota.RetentionPolicy.producer_exception);
        newArrayList.add(BacklogQuota.RetentionPolicy.producer_request_hold);
        for (BacklogQuota.RetentionPolicy retentionPolicy : newArrayList) {
            this.admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1048576L, retentionPolicy));
            Thread.sleep(200L);
            DestinationName destinationName = DestinationName.get(String.format("persistent://pulsar/global/ns1/%s-%d", retentionPolicy, Long.valueOf(System.currentTimeMillis())));
            ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, destinationName);
            ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url2, destinationName);
            Replicator persistentReplicator = this.pulsar1.getBrokerService().getTopicReference(destinationName.toString()).getPersistentReplicator("r2");
            messageProducer.produce(1);
            Thread.sleep(500L);
            this.admin1.namespaces().setBacklogQuota("pulsar/global/ns1", new BacklogQuota(1L, retentionPolicy));
            Thread.sleep(6000L);
            Assert.assertEquals(persistentReplicator.getStats().replicationBacklog, 0L);
            messageProducer.produce(1);
            Thread.sleep(500L);
            Assert.assertEquals(persistentReplicator.getStats().replicationBacklog, 1L);
            messageConsumer.receive(1);
            messageConsumer.receive(1);
            Assert.assertEquals(persistentReplicator.getStats().replicationBacklog, 0L);
            messageProducer.close();
            messageConsumer.close();
        }
    }

    @Test(timeOut = 5000)
    public void testCloseReplicatorStartProducer() throws Exception {
        DestinationName destinationName = DestinationName.get("persistent://pulsar/global/ns1/closeCursor");
        ReplicatorTestBase.MessageProducer messageProducer = new ReplicatorTestBase.MessageProducer(this.url1, destinationName);
        ReplicatorTestBase.MessageConsumer messageConsumer = new ReplicatorTestBase.MessageConsumer(this.url1, destinationName);
        ReplicatorTestBase.MessageConsumer messageConsumer2 = new ReplicatorTestBase.MessageConsumer(this.url2, destinationName);
        PersistentReplicator persistentReplicator = this.pulsar1.getBrokerService().getTopicReference(destinationName.toString()).getPersistentReplicator("r2");
        Field declaredField = PersistentReplicator.class.getDeclaredField("cursor");
        declaredField.setAccessible(true);
        ManagedCursor managedCursor = (ManagedCursor) declaredField.get(persistentReplicator);
        managedCursor.close();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        messageProducer.produce(10);
        managedCursor.asyncReadEntriesOrWait(10, new AsyncCallbacks.ReadEntriesCallback() { // from class: org.apache.pulsar.broker.service.ReplicatorTest.7
            public void readEntriesComplete(List<Entry> list, Object obj) {
                countDownLatch.countDown();
                Assert.fail("it should have been failed");
            }

            public void readEntriesFailed(ManagedLedgerException managedLedgerException, Object obj) {
                countDownLatch.countDown();
                Assert.assertTrue(managedLedgerException instanceof ManagedLedgerException.CursorAlreadyClosedException);
            }
        }, (Object) null);
        persistentReplicator.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("Cursor already closed exception"), (Object) null);
        Thread.sleep(1000L);
        Field declaredField2 = AbstractReplicator.class.getDeclaredField("producer");
        declaredField2.setAccessible(true);
        Assert.assertEquals((ProducerImpl) declaredField2.get(persistentReplicator), (Object) null);
        messageProducer.close();
        messageConsumer.close();
        messageConsumer2.close();
    }
}
