package org.apache.pulsar.broker.service.v1;

import com.google.common.collect.Sets;
import java.net.URL;
import java.util.ArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.test.PortManager;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

/* loaded from: input_file:org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase.class */
public class V1_ReplicatorTestBase {
    URL url1;
    URL urlTls1;
    PulsarService pulsar1;
    BrokerService ns1;
    PulsarAdmin admin1;
    LocalBookkeeperEnsemble bkEnsemble1;
    URL url2;
    URL urlTls2;
    PulsarService pulsar2;
    BrokerService ns2;
    PulsarAdmin admin2;
    LocalBookkeeperEnsemble bkEnsemble2;
    URL url3;
    URL urlTls3;
    PulsarService pulsar3;
    BrokerService ns3;
    PulsarAdmin admin3;
    LocalBookkeeperEnsemble bkEnsemble3;
    ZookeeperServerTest globalZkS;
    static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
    protected static final String TLS_SERVER_CERT_FILE_PATH = "./src/test/resources/certificate/server.crt";
    protected static final String TLS_SERVER_KEY_FILE_PATH = "./src/test/resources/certificate/server.key";
    private static final Logger log = LoggerFactory.getLogger(V1_ReplicatorTestBase.class);
    ServiceConfiguration config1 = new ServiceConfiguration();
    ServiceConfiguration config2 = new ServiceConfiguration();
    ServiceConfiguration config3 = new ServiceConfiguration();
    ExecutorService executor = new ThreadPoolExecutor(TIME_TO_CHECK_BACKLOG_QUOTA, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue());

    /* loaded from: input_file:org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase$MessageConsumer.class */
    static class MessageConsumer {
        final URL url;
        final String namespace;
        final String topicName;
        final PulsarClient client;
        final Consumer<byte[]> consumer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageConsumer(URL url, TopicName topicName) throws Exception {
            this(url, topicName, "sub-id");
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageConsumer(URL url, TopicName topicName, String str) throws Exception {
            this.url = url;
            this.namespace = topicName.getNamespace();
            this.topicName = topicName.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                this.consumer = this.client.newConsumer().topic(new String[]{this.topicName}).subscriptionName(str).subscribe();
            } catch (Exception e) {
                this.client.close();
                throw e;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void receive(int i) throws Exception {
            V1_ReplicatorTestBase.log.info("Start receiving messages");
            for (int i2 = 0; i2 < i; i2++) {
                Message receive = this.consumer.receive();
                this.consumer.acknowledge(receive);
                String str = new String(receive.getData());
                Assert.assertEquals(str, "test-" + i2);
                V1_ReplicatorTestBase.log.info("Received message {}", str);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean drained() throws Exception {
            return this.consumer.receive(0, TimeUnit.MICROSECONDS) == null;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void close() throws Exception {
            this.client.close();
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/service/v1/V1_ReplicatorTestBase$MessageProducer.class */
    static class MessageProducer {
        URL url;
        String namespace;
        String topicName;
        PulsarClient client;
        Producer<byte[]> producer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageProducer(URL url, TopicName topicName) throws Exception {
            this.url = url;
            this.namespace = topicName.getNamespace();
            this.topicName = topicName.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            this.producer = this.client.newProducer().topic(this.topicName).create();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageProducer(URL url, TopicName topicName, boolean z) throws Exception {
            this.url = url;
            this.namespace = topicName.getNamespace();
            this.topicName = topicName.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            ProducerBuilder producerBuilder = this.client.newProducer().topic(this.topicName);
            if (z) {
                producerBuilder.enableBatching(true);
                producerBuilder.batchingMaxPublishDelay(1L, TimeUnit.SECONDS);
                producerBuilder.batchingMaxMessages(V1_ReplicatorTestBase.TIME_TO_CHECK_BACKLOG_QUOTA);
            }
            this.producer = producerBuilder.create();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void produceBatch(int i) throws Exception {
            V1_ReplicatorTestBase.log.info("Start sending batch messages");
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(this.producer.sendAsync(("test-" + i2).getBytes()));
                V1_ReplicatorTestBase.log.info("queued message {}", "test-" + i2);
            }
            FutureUtil.waitForAll(arrayList).get();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void produce(int i) throws Exception {
            V1_ReplicatorTestBase.log.info("Start sending messages");
            for (int i2 = 0; i2 < i; i2++) {
                this.producer.send(("test-" + i2).getBytes());
                V1_ReplicatorTestBase.log.info("Sent message {}", "test-" + i2);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void produce(int i, MessageBuilder<byte[]> messageBuilder) throws Exception {
            V1_ReplicatorTestBase.log.info("Start sending messages");
            for (int i2 = 0; i2 < i; i2++) {
                String str = new String("test-builder-" + i2);
                messageBuilder.setContent(str.getBytes());
                this.producer.send(messageBuilder.build());
                V1_ReplicatorTestBase.log.info("Sent message {}", str);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void close() throws Exception {
            this.client.close();
        }
    }

    public int getBrokerServicePurgeInactiveFrequency() {
        return 60;
    }

    public boolean isBrokerServicePurgeInactiveTopic() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setup() throws Exception {
        log.info("--- Starting V1_ReplicatorTestBase::setup ---");
        int nextFreePort = PortManager.nextFreePort();
        this.globalZkS = new ZookeeperServerTest(nextFreePort);
        this.globalZkS.start();
        int nextFreePort2 = PortManager.nextFreePort();
        this.bkEnsemble1 = new LocalBookkeeperEnsemble(3, nextFreePort2, PortManager.nextFreePort());
        this.bkEnsemble1.start();
        int nextFreePort3 = PortManager.nextFreePort();
        int nextFreePort4 = PortManager.nextFreePort();
        this.config1.setClusterName("r1");
        this.config1.setAdvertisedAddress("localhost");
        this.config1.setWebServicePort(nextFreePort3);
        this.config1.setWebServicePortTls(nextFreePort4);
        this.config1.setZookeeperServers("127.0.0.1:" + nextFreePort2);
        this.config1.setConfigurationStoreServers("127.0.0.1:" + nextFreePort + "/foo");
        this.config1.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
        this.config1.setBrokerServicePurgeInactiveFrequencyInSeconds(inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
        this.config1.setBrokerServicePort(PortManager.nextFreePort());
        this.config1.setBrokerServicePortTls(PortManager.nextFreePort());
        this.config1.setTlsEnabled(true);
        this.config1.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config1.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
        this.config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
        this.config1.setDefaultNumberOfNamespaceBundles(1);
        this.pulsar1 = new PulsarService(this.config1);
        this.pulsar1.start();
        this.ns1 = this.pulsar1.getBrokerService();
        this.url1 = new URL("http://localhost:" + nextFreePort3);
        this.urlTls1 = new URL("https://localhost:" + nextFreePort4);
        this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
        int nextFreePort5 = PortManager.nextFreePort();
        this.bkEnsemble2 = new LocalBookkeeperEnsemble(3, nextFreePort5, PortManager.nextFreePort());
        this.bkEnsemble2.start();
        int nextFreePort6 = PortManager.nextFreePort();
        int nextFreePort7 = PortManager.nextFreePort();
        this.config2.setClusterName("r2");
        this.config2.setAdvertisedAddress("localhost");
        this.config2.setWebServicePort(nextFreePort6);
        this.config2.setWebServicePortTls(nextFreePort7);
        this.config2.setZookeeperServers("127.0.0.1:" + nextFreePort5);
        this.config2.setConfigurationStoreServers("127.0.0.1:" + nextFreePort + "/foo");
        this.config2.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
        this.config2.setBrokerServicePurgeInactiveFrequencyInSeconds(inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
        this.config2.setBrokerServicePort(PortManager.nextFreePort());
        this.config2.setBrokerServicePortTls(PortManager.nextFreePort());
        this.config2.setTlsEnabled(true);
        this.config2.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config2.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
        this.config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
        this.config2.setDefaultNumberOfNamespaceBundles(1);
        this.pulsar2 = new PulsarService(this.config2);
        this.pulsar2.start();
        this.ns2 = this.pulsar2.getBrokerService();
        this.url2 = new URL("http://localhost:" + nextFreePort6);
        this.urlTls2 = new URL("https://localhost:" + nextFreePort7);
        this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
        int nextFreePort8 = PortManager.nextFreePort();
        this.bkEnsemble3 = new LocalBookkeeperEnsemble(3, nextFreePort8, PortManager.nextFreePort());
        this.bkEnsemble3.start();
        int nextFreePort9 = PortManager.nextFreePort();
        int nextFreePort10 = PortManager.nextFreePort();
        this.config3.setClusterName("r3");
        this.config3.setAdvertisedAddress("localhost");
        this.config3.setWebServicePort(nextFreePort9);
        this.config3.setWebServicePortTls(nextFreePort10);
        this.config3.setZookeeperServers("127.0.0.1:" + nextFreePort8);
        this.config3.setConfigurationStoreServers("127.0.0.1:" + nextFreePort + "/foo");
        this.config3.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
        this.config3.setBrokerServicePurgeInactiveFrequencyInSeconds(inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
        this.config3.setBrokerServicePort(PortManager.nextFreePort());
        this.config3.setBrokerServicePortTls(PortManager.nextFreePort());
        this.config3.setTlsEnabled(true);
        this.config3.setTlsCertificateFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
        this.config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
        this.config3.setDefaultNumberOfNamespaceBundles(1);
        this.pulsar3 = new PulsarService(this.config3);
        this.pulsar3.start();
        this.ns3 = this.pulsar3.getBrokerService();
        this.url3 = new URL("http://localhost:" + nextFreePort9);
        this.urlTls3 = new URL("https://localhost:" + nextFreePort10);
        this.admin3 = PulsarAdmin.builder().serviceHttpUrl(this.url3.toString()).build();
        this.admin1.clusters().createCluster("r1", new ClusterData(this.url1.toString(), this.urlTls1.toString(), this.pulsar1.getBrokerServiceUrl(), this.pulsar1.getBrokerServiceUrlTls()));
        this.admin1.clusters().createCluster("r2", new ClusterData(this.url2.toString(), this.urlTls2.toString(), this.pulsar2.getBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrlTls()));
        this.admin1.clusters().createCluster("r3", new ClusterData(this.url3.toString(), this.urlTls3.toString(), this.pulsar3.getBrokerServiceUrl(), this.pulsar3.getBrokerServiceUrlTls()));
        this.admin1.clusters().createCluster("global", new ClusterData("http://global:8080", "https://global:8443"));
        this.admin1.tenants().createTenant("pulsar", new TenantInfo(Sets.newHashSet(new String[]{"appid1", "appid2", "appid3"}), Sets.newHashSet(new String[]{"r1", "r2", "r3"})));
        this.admin1.namespaces().createNamespace("pulsar/global/ns");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        this.admin1.namespaces().createNamespace("pulsar/global/ns1");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns1", Sets.newHashSet(new String[]{"r1", "r2"}));
        Assert.assertEquals(this.admin2.clusters().getCluster("r1").getServiceUrl(), this.url1.toString());
        Assert.assertEquals(this.admin2.clusters().getCluster("r2").getServiceUrl(), this.url2.toString());
        Assert.assertEquals(this.admin2.clusters().getCluster("r3").getServiceUrl(), this.url3.toString());
        Assert.assertEquals(this.admin2.clusters().getCluster("r1").getBrokerServiceUrl(), this.pulsar1.getBrokerServiceUrl());
        Assert.assertEquals(this.admin2.clusters().getCluster("r2").getBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrl());
        Assert.assertEquals(this.admin2.clusters().getCluster("r3").getBrokerServiceUrl(), this.pulsar3.getBrokerServiceUrl());
        Thread.sleep(100L);
        log.info("--- V1_ReplicatorTestBase::setup completed ---");
    }

    private int inSec(int i, TimeUnit timeUnit) {
        return (int) TimeUnit.SECONDS.convert(i, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown() throws Exception {
        log.info("--- Shutting down ---");
        this.executor.shutdown();
        this.admin1.close();
        this.admin2.close();
        this.admin3.close();
        this.pulsar3.close();
        this.pulsar2.close();
        this.pulsar1.close();
        this.bkEnsemble1.stop();
        this.bkEnsemble2.stop();
        this.bkEnsemble3.stop();
        this.globalZkS.stop();
    }
}
