package org.apache.pulsar.broker.service;

import io.netty.channel.DefaultEventLoop;
import io.netty.util.internal.DefaultPriorityQueue;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/AbstractReplicatorTest.class */
public class AbstractReplicatorTest {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/AbstractReplicatorTest$ReplicatorInTest.class */
    public static class ReplicatorInTest extends AbstractReplicator {
        public ReplicatorInTest(String str, Topic topic, String str2, String str3, String str4, BrokerService brokerService, PulsarClientImpl pulsarClientImpl) throws PulsarServerException {
            super(str, topic, str2, str3, str4, brokerService, pulsarClientImpl);
        }

        protected String getProducerName() {
            return "pulsar.repl.producer";
        }

        protected void setProducerAndTriggerReadEntries(Producer<byte[]> producer) {
        }

        protected Position getReplicatorReadPosition() {
            return PositionImpl.EARLIEST;
        }

        public ReplicatorStatsImpl getStats() {
            return null;
        }

        public void updateRates() {
        }

        public boolean isConnected() {
            return false;
        }

        public long getNumberOfEntriesInBacklog() {
            return 0L;
        }

        protected void disableReplicatorRead() {
        }
    }

    @Test
    public void testRetryStartProducerStoppedByTopicRemove() throws Exception {
        DefaultEventLoop defaultEventLoop = new DefaultEventLoop();
        try {
            ServiceConfiguration serviceConfiguration = (ServiceConfiguration) Mockito.mock(ServiceConfiguration.class);
            PulsarService pulsarService = (PulsarService) Mockito.mock(PulsarService.class);
            BrokerService brokerService = (BrokerService) Mockito.mock(BrokerService.class);
            Topic topic = (Topic) Mockito.mock(Topic.class);
            ConnectionPool connectionPool = (ConnectionPool) Mockito.mock(ConnectionPool.class);
            PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) Mockito.mock(PulsarClientImpl.class);
            Mockito.when(pulsarClientImpl.getCnxPool()).thenReturn(connectionPool);
            PulsarClientImpl pulsarClientImpl2 = (PulsarClientImpl) Mockito.mock(PulsarClientImpl.class);
            Mockito.when(pulsarClientImpl2.getCnxPool()).thenReturn(connectionPool);
            ProducerBuilder producerBuilder = (ProducerBuilder) Mockito.mock(ProducerBuilder.class);
            ConcurrentOpenHashMap concurrentOpenHashMap = new ConcurrentOpenHashMap();
            Mockito.when(brokerService.executor()).thenReturn(defaultEventLoop);
            Mockito.when(brokerService.getTopics()).thenReturn(concurrentOpenHashMap);
            Mockito.when(pulsarClientImpl2.newProducer((Schema) Mockito.any(Schema.class))).thenReturn(producerBuilder);
            Mockito.when(brokerService.pulsar()).thenReturn(pulsarService);
            Mockito.when(pulsarService.getClient()).thenReturn(pulsarClientImpl);
            Mockito.when(pulsarService.getConfiguration()).thenReturn(serviceConfiguration);
            Mockito.when(Integer.valueOf(serviceConfiguration.getReplicationProducerQueueSize())).thenReturn(100);
            Mockito.when(topic.getName()).thenReturn("remoteTopicName");
            Mockito.when(producerBuilder.topic((String) Mockito.any())).thenReturn(producerBuilder);
            Mockito.when(producerBuilder.messageRoutingMode((MessageRoutingMode) Mockito.any())).thenReturn(producerBuilder);
            Mockito.when(producerBuilder.enableBatching(Mockito.anyBoolean())).thenReturn(producerBuilder);
            Mockito.when(producerBuilder.sendTimeout(Mockito.anyInt(), (TimeUnit) Mockito.any())).thenReturn(producerBuilder);
            Mockito.when(producerBuilder.maxPendingMessages(Mockito.anyInt())).thenReturn(producerBuilder);
            Mockito.when(producerBuilder.producerName(Mockito.anyString())).thenReturn(producerBuilder);
            Mockito.when(producerBuilder.create()).thenThrow(new Throwable[]{new RuntimeException("mocked ex")});
            Mockito.when(producerBuilder.createAsync()).thenReturn(CompletableFuture.failedFuture(new RuntimeException("mocked ex")));
            ReplicatorInTest replicatorInTest = new ReplicatorInTest("localCluster", topic, "remoteCluster", "remoteTopicName", "pulsar.repl", brokerService, pulsarClientImpl2);
            replicatorInTest.startProducer();
            replicatorInTest.terminate();
            Awaitility.await().atMost(10L, TimeUnit.SECONDS).untilAsserted(() -> {
                AtomicInteger atomicInteger = new AtomicInteger();
                CountDownLatch countDownLatch = new CountDownLatch(1);
                defaultEventLoop.execute(() -> {
                    synchronized (replicatorInTest) {
                        atomicInteger.set(((LinkedBlockingQueue) WhiteboxImpl.getInternalState(defaultEventLoop, "taskQueue")).size() + ((DefaultPriorityQueue) WhiteboxImpl.getInternalState(defaultEventLoop, "scheduledTaskQueue")).size());
                        countDownLatch.countDown();
                    }
                });
                countDownLatch.await();
                Assert.assertEquals(atomicInteger.get(), 0);
            });
            if (Collections.singletonList(defaultEventLoop).get(0) != null) {
                defaultEventLoop.shutdownNow();
            }
        } catch (Throwable th) {
            if (Collections.singletonList(defaultEventLoop).get(0) != null) {
                defaultEventLoop.shutdownNow();
            }
            throw th;
        }
    }
}
