package org.apache.pulsar.broker.service;

import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/OneWayReplicatorTest.class */
public class OneWayReplicatorTest extends OneWayReplicatorTestBase {
    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    @BeforeClass(alwaysRun = true, timeOut = 300000)
    public void setup() throws Exception {
        super.setup();
    }

    @Override // org.apache.pulsar.broker.service.OneWayReplicatorTestBase
    @AfterClass(alwaysRun = true, timeOut = 300000)
    public void cleanup() throws Exception {
        super.cleanup();
    }

    private void waitReplicatorStarted(String str) {
        Awaitility.await().untilAsserted(() -> {
            Optional optional = (Optional) this.pulsar2.getBrokerService().getTopic(str, false).get();
            Assert.assertTrue(optional.isPresent());
            Assert.assertFalse(((PersistentTopic) optional.get()).getProducers().isEmpty());
        });
    }

    private ProducerImpl overrideProducerForReplicator(AbstractReplicator abstractReplicator, ProducerImpl producerImpl) throws Exception {
        Field declaredField = AbstractReplicator.class.getDeclaredField("producer");
        declaredField.setAccessible(true);
        ProducerImpl producerImpl2 = (ProducerImpl) declaredField.get(abstractReplicator);
        synchronized (abstractReplicator) {
            declaredField.set(abstractReplicator, producerImpl);
        }
        return producerImpl2;
    }

    @Test
    public void testReplicatorProducerStatInTopic() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        byte[] bytes = "test".getBytes();
        Producer create = this.client1.newProducer().topic(newUniqueName).create();
        Consumer subscribe = this.client2.newConsumer().topic(new String[]{newUniqueName}).subscriptionName("subscribe_1").subscribe();
        create.newMessage().value(bytes).send();
        this.pulsar1.getBrokerService().checkReplicationPolicies();
        Assert.assertEquals((byte[]) subscribe.receive(10, TimeUnit.SECONDS).getValue(), bytes);
        TopicStats stats = this.admin2.topics().getStats(newUniqueName);
        Assert.assertTrue(stats.getPublishers().size() + stats.getReplication().size() > 0);
        subscribe.close();
        create.close();
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
    }

    @Test
    public void testCreateRemoteConsumerFirst() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        Producer create = this.client1.newProducer(Schema.STRING).topic(newUniqueName).create();
        Consumer subscribe = this.client2.newConsumer(Schema.STRING).topic(new String[]{newUniqueName}).subscriptionName("s1").subscribe();
        waitReplicatorStarted(newUniqueName);
        create.close();
        subscribe.close();
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
    }

    @Test
    public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception {
        String newUniqueName = BrokerTestUtil.newUniqueName("persistent://public/default/tp_");
        this.admin1.topics().createNonPartitionedTopic(newUniqueName);
        waitReplicatorStarted(newUniqueName);
        PersistentReplicator persistentReplicator = (PersistentReplicator) ((PersistentTopic) ((Optional) this.pulsar1.getBrokerService().getTopic(newUniqueName, false).join()).get()).getReplicators().values().iterator().next();
        ProducerImpl producerImpl = (ProducerImpl) Mockito.mock(ProducerImpl.class);
        Mockito.when(producerImpl.closeAsync()).thenReturn(CompletableFuture.failedFuture(new Exception("mocked ex")));
        ProducerImpl overrideProducerForReplicator = overrideProducerForReplicator(persistentReplicator, producerImpl);
        this.admin1.topics().unload(newUniqueName);
        overrideProducerForReplicator(persistentReplicator, overrideProducerForReplicator);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertFalse(persistentReplicator.isConnected());
        });
        cleanupTopics(() -> {
            this.admin1.topics().delete(newUniqueName);
            this.admin2.topics().delete(newUniqueName);
        });
    }
}
