package org.apache.pulsar.broker.service;

import com.google.common.collect.Sets;
import com.google.common.io.Resources;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.net.URL;
import java.util.Optional;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.socks5.auth.DefaultPasswordAuthImpl;
import org.apache.pulsar.tests.TestRetrySupport;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

/* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorTestBase.class */
public abstract class ReplicatorTestBase extends TestRetrySupport {
    URL url1;
    URL urlTls1;
    PulsarService pulsar1;
    BrokerService ns1;
    PulsarAdmin admin1;
    LocalBookkeeperEnsemble bkEnsemble1;
    URL url2;
    URL urlTls2;
    PulsarService pulsar2;
    BrokerService ns2;
    PulsarAdmin admin2;
    LocalBookkeeperEnsemble bkEnsemble2;
    URL url3;
    URL urlTls3;
    PulsarService pulsar3;
    BrokerService ns3;
    PulsarAdmin admin3;
    LocalBookkeeperEnsemble bkEnsemble3;
    URL url4;
    URL urlTls4;
    PulsarService pulsar4;
    PulsarAdmin admin4;
    LocalBookkeeperEnsemble bkEnsemble4;
    ZookeeperServerTest globalZkS;
    ExecutorService executor;
    static final int TIME_TO_CHECK_BACKLOG_QUOTA = 5;
    protected static final String keyStoreType = "JKS";
    protected static final String keyStorePassword = "111111";
    protected static final String brokerKeyStorePath = Resources.getResource("certificate-authority/jks/broker.keystore.jks").getPath();
    protected static final String brokerTrustStorePath = Resources.getResource("certificate-authority/jks/broker.truststore.jks").getPath();
    protected static final String clientKeyStorePath = Resources.getResource("certificate-authority/jks/client.keystore.jks").getPath();
    protected static final String clientTrustStorePath = Resources.getResource("certificate-authority/jks/client.truststore.jks").getPath();
    private static final Logger log = LoggerFactory.getLogger(ReplicatorTestBase.class);
    ServiceConfiguration config1 = new ServiceConfiguration();
    ServiceConfiguration config2 = new ServiceConfiguration();
    ServiceConfiguration config3 = new ServiceConfiguration();
    ServiceConfiguration config4 = new ServiceConfiguration();
    protected final String brokerCertFilePath = Resources.getResource("certificate-authority/server-keys/broker.cert.pem").getPath();
    protected final String brokerFilePath = Resources.getResource("certificate-authority/server-keys/broker.key-pk8.pem").getPath();
    protected final String clientCertFilePath = Resources.getResource("certificate-authority/client-keys/admin.cert.pem").getPath();
    protected final String clientKeyFilePath = Resources.getResource("certificate-authority/client-keys/admin.key-pk8.pem").getPath();
    protected final String caCertFilePath = Resources.getResource("certificate-authority/certs/ca.cert.pem").getPath();
    protected boolean tlsWithKeyStore = false;
    protected final String cluster1 = "r1";
    protected final String cluster2 = "r2";
    protected final String cluster3 = "r3";
    protected final String cluster4 = "r4";

    /* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorTestBase$MessageConsumer.class */
    static class MessageConsumer implements AutoCloseable {
        final URL url;
        final String namespace;
        final String topicName;
        final PulsarClient client;
        final Consumer<byte[]> consumer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageConsumer(URL url, TopicName topicName) throws Exception {
            this(url, topicName, "sub-id");
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageConsumer(URL url, TopicName topicName, String str) throws Exception {
            this.url = url;
            this.namespace = topicName.getNamespace();
            this.topicName = topicName.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                this.consumer = this.client.newConsumer().topic(new String[]{this.topicName}).subscriptionName(str).subscribe();
            } catch (Exception e) {
                this.client.close();
                throw e;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void receive(int i) throws Exception {
            ReplicatorTestBase.log.info("Start receiving messages");
            TreeSet treeSet = new TreeSet();
            int i2 = 0;
            while (i2 < i) {
                Message receive = this.consumer.receive(10, TimeUnit.SECONDS);
                Assert.assertNotNull(receive);
                this.consumer.acknowledge(receive);
                String str = new String(receive.getData());
                ReplicatorTestBase.log.info("Received message {}", str);
                if (treeSet.add(str)) {
                    Assert.assertEquals(str, "test-" + i2);
                    i2++;
                } else {
                    ReplicatorTestBase.log.info("Ignoring duplicate {}", str);
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean drained() throws Exception {
            return this.consumer.receive(0, TimeUnit.MICROSECONDS) == null;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            try {
                this.client.close();
            } catch (PulsarClientException e) {
                ReplicatorTestBase.log.warn("Failed to close client", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/service/ReplicatorTestBase$MessageProducer.class */
    static class MessageProducer implements AutoCloseable {
        URL url;
        String namespace;
        String topicName;
        PulsarClient client;
        Producer<byte[]> producer;

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageProducer(URL url, TopicName topicName) throws Exception {
            this.url = url;
            this.namespace = topicName.getNamespace();
            this.topicName = topicName.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                this.producer = this.client.newProducer().topic(this.topicName).enableBatching(false).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
            } catch (Exception e) {
                this.client.close();
                throw e;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public MessageProducer(URL url, TopicName topicName, boolean z) throws Exception {
            this.url = url;
            this.namespace = topicName.getNamespace();
            this.topicName = topicName.toString();
            this.client = PulsarClient.builder().serviceUrl(url.toString()).statsInterval(0L, TimeUnit.SECONDS).build();
            try {
                this.producer = this.client.newProducer().topic(this.topicName).enableBatching(z).batchingMaxPublishDelay(1L, TimeUnit.SECONDS).batchingMaxMessages(5).create();
            } catch (Exception e) {
                this.client.close();
                throw e;
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void produceBatch(int i) throws Exception {
            ReplicatorTestBase.log.info("Start sending batch messages");
            for (int i2 = 0; i2 < i; i2++) {
                this.producer.sendAsync(("test-" + i2).getBytes());
                ReplicatorTestBase.log.info("queued message {}", "test-" + i2);
            }
            this.producer.flush();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void produce(int i) throws Exception {
            ReplicatorTestBase.log.info("Start sending messages");
            for (int i2 = 0; i2 < i; i2++) {
                this.producer.send(("test-" + i2).getBytes());
                ReplicatorTestBase.log.info("Sent message {}", "test-" + i2);
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public TypedMessageBuilder<byte[]> newMessage() {
            return this.producer.newMessage();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void produce(int i, TypedMessageBuilder<byte[]> typedMessageBuilder) throws Exception {
            ReplicatorTestBase.log.info("Start sending messages");
            for (int i2 = 0; i2 < i; i2++) {
                String str = "test-" + i2;
                typedMessageBuilder.value(str.getBytes()).send();
                ReplicatorTestBase.log.info("Sent message {}", str);
            }
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            try {
                this.client.close();
            } catch (PulsarClientException e) {
                ReplicatorTestBase.log.warn("Failed to close client", e);
            }
        }
    }

    public int getBrokerServicePurgeInactiveFrequency() {
        return 60;
    }

    public boolean isBrokerServicePurgeInactiveTopic() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setup() throws Exception {
        incrementSetupNumber();
        log.info("--- Starting ReplicatorTestBase::setup ---");
        this.executor = new ThreadPoolExecutor(5, 20, 30L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new DefaultThreadFactory("ReplicatorTestBase"));
        this.globalZkS = new ZookeeperServerTest(0);
        this.globalZkS.start();
        this.bkEnsemble1 = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble1.start();
        setConfig1DefaultValue();
        this.pulsar1 = new PulsarService(this.config1);
        this.pulsar1.start();
        this.ns1 = this.pulsar1.getBrokerService();
        this.url1 = new URL(this.pulsar1.getWebServiceAddress());
        this.urlTls1 = new URL(this.pulsar1.getWebServiceAddressTls());
        this.admin1 = PulsarAdmin.builder().serviceHttpUrl(this.url1.toString()).build();
        this.bkEnsemble2 = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble2.start();
        setConfig2DefaultValue();
        this.pulsar2 = new PulsarService(this.config2);
        this.pulsar2.start();
        this.ns2 = this.pulsar2.getBrokerService();
        this.url2 = new URL(this.pulsar2.getWebServiceAddress());
        this.urlTls2 = new URL(this.pulsar2.getWebServiceAddressTls());
        this.admin2 = PulsarAdmin.builder().serviceHttpUrl(this.url2.toString()).build();
        this.bkEnsemble3 = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble3.start();
        setConfig3DefaultValue();
        this.pulsar3 = new PulsarService(this.config3);
        this.pulsar3.start();
        this.ns3 = this.pulsar3.getBrokerService();
        this.url3 = new URL(this.pulsar3.getWebServiceAddress());
        this.urlTls3 = new URL(this.pulsar3.getWebServiceAddressTls());
        this.admin3 = PulsarAdmin.builder().serviceHttpUrl(this.url3.toString()).build();
        this.bkEnsemble4 = new LocalBookkeeperEnsemble(3, 0, () -> {
            return 0;
        });
        this.bkEnsemble4.start();
        setConfig4DefaultValue();
        this.pulsar4 = new PulsarService(this.config4);
        this.pulsar4.start();
        this.url4 = new URL(this.pulsar4.getWebServiceAddress());
        this.urlTls4 = new URL(this.pulsar4.getWebServiceAddressTls());
        this.admin4 = PulsarAdmin.builder().serviceHttpUrl(this.url4.toString()).build();
        this.admin1.clusters().createCluster("r1", ClusterData.builder().serviceUrl(this.url1.toString()).serviceUrlTls(this.urlTls1.toString()).brokerServiceUrl(this.pulsar1.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar1.getBrokerServiceUrlTls()).brokerClientTlsEnabled(true).brokerClientCertificateFilePath(this.clientCertFilePath).brokerClientKeyFilePath(this.clientKeyFilePath).brokerClientTrustCertsFilePath(this.caCertFilePath).brokerClientTlsEnabledWithKeyStore(this.tlsWithKeyStore).brokerClientTlsKeyStore(clientKeyStorePath).brokerClientTlsKeyStorePassword("111111").brokerClientTlsKeyStoreType("JKS").brokerClientTlsTrustStore(clientTrustStorePath).brokerClientTlsTrustStorePassword("111111").brokerClientTlsTrustStoreType("JKS").build());
        this.admin1.clusters().createCluster("r2", ClusterData.builder().serviceUrl(this.url2.toString()).serviceUrlTls(this.urlTls2.toString()).brokerServiceUrl(this.pulsar2.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar2.getBrokerServiceUrlTls()).brokerClientTlsEnabled(true).brokerClientCertificateFilePath(this.clientCertFilePath).brokerClientKeyFilePath(this.clientKeyFilePath).brokerClientTrustCertsFilePath(this.caCertFilePath).brokerClientTlsEnabledWithKeyStore(this.tlsWithKeyStore).brokerClientTlsKeyStore(clientKeyStorePath).brokerClientTlsKeyStorePassword("111111").brokerClientTlsKeyStoreType("JKS").brokerClientTlsTrustStore(clientTrustStorePath).brokerClientTlsTrustStorePassword("111111").brokerClientTlsTrustStoreType("JKS").build());
        this.admin1.clusters().createCluster("r3", ClusterData.builder().serviceUrl(this.url3.toString()).serviceUrlTls(this.urlTls3.toString()).brokerServiceUrl(this.pulsar3.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar3.getBrokerServiceUrlTls()).brokerClientTlsEnabled(true).brokerClientCertificateFilePath(this.clientCertFilePath).brokerClientKeyFilePath(this.clientKeyFilePath).brokerClientTrustCertsFilePath(this.caCertFilePath).brokerClientTlsEnabledWithKeyStore(this.tlsWithKeyStore).brokerClientTlsKeyStore(clientKeyStorePath).brokerClientTlsKeyStorePassword("111111").brokerClientTlsKeyStoreType("JKS").brokerClientTlsTrustStore(clientTrustStorePath).brokerClientTlsTrustStorePassword("111111").brokerClientTlsTrustStoreType("JKS").build());
        this.admin4.clusters().createCluster("r4", ClusterData.builder().serviceUrl(this.url4.toString()).serviceUrlTls(this.urlTls4.toString()).brokerServiceUrl(this.pulsar4.getBrokerServiceUrl()).brokerServiceUrlTls(this.pulsar4.getBrokerServiceUrlTls()).brokerClientTlsEnabled(true).brokerClientCertificateFilePath(this.clientCertFilePath).brokerClientKeyFilePath(this.clientKeyFilePath).brokerClientTrustCertsFilePath(this.caCertFilePath).brokerClientTlsEnabledWithKeyStore(this.tlsWithKeyStore).brokerClientTlsKeyStore(clientKeyStorePath).brokerClientTlsKeyStorePassword("111111").brokerClientTlsKeyStoreType("JKS").brokerClientTlsTrustStore(clientTrustStorePath).brokerClientTlsTrustStorePassword("111111").brokerClientTlsTrustStoreType("JKS").build());
        this.admin1.tenants().createTenant(DefaultPasswordAuthImpl.DEFAULT_PASSWORD, new TenantInfoImpl(Sets.newHashSet(new String[]{"appid1", "appid2", "appid3"}), Sets.newHashSet(new String[]{"r1", "r2", "r3"})));
        this.admin1.namespaces().createNamespace("pulsar/ns", Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        this.admin1.namespaces().createNamespace("pulsar/ns1", Sets.newHashSet(new String[]{"r1", "r2"}));
        Assert.assertEquals(this.admin2.clusters().getCluster("r1").getServiceUrl(), this.url1.toString());
        Assert.assertEquals(this.admin2.clusters().getCluster("r2").getServiceUrl(), this.url2.toString());
        Assert.assertEquals(this.admin2.clusters().getCluster("r3").getServiceUrl(), this.url3.toString());
        Assert.assertEquals(this.admin2.clusters().getCluster("r1").getBrokerServiceUrl(), this.pulsar1.getBrokerServiceUrl());
        Assert.assertEquals(this.admin2.clusters().getCluster("r2").getBrokerServiceUrl(), this.pulsar2.getBrokerServiceUrl());
        Assert.assertEquals(this.admin2.clusters().getCluster("r3").getBrokerServiceUrl(), this.pulsar3.getBrokerServiceUrl());
        this.admin1.clusters().createCluster("global", ClusterData.builder().serviceUrl("http://global:8080").serviceUrlTls("https://global:8443").build());
        this.admin1.namespaces().createNamespace("pulsar/global/ns");
        this.admin1.namespaces().setNamespaceReplicationClusters("pulsar/global/ns", Sets.newHashSet(new String[]{"r1", "r2", "r3"}));
        Thread.sleep(100L);
        log.info("--- ReplicatorTestBase::setup completed ---");
    }

    public void setConfig3DefaultValue() {
        setConfigDefaults(this.config3, "r3", this.bkEnsemble3);
        this.config3.setTlsEnabled(true);
    }

    public void setConfig1DefaultValue() {
        setConfigDefaults(this.config1, "r1", this.bkEnsemble1);
    }

    public void setConfig2DefaultValue() {
        setConfigDefaults(this.config2, "r2", this.bkEnsemble2);
    }

    public void setConfig4DefaultValue() {
        setConfigDefaults(this.config4, "r4", this.bkEnsemble4);
        this.config4.setEnableReplicatedSubscriptions(false);
    }

    private void setConfigDefaults(ServiceConfiguration serviceConfiguration, String str, LocalBookkeeperEnsemble localBookkeeperEnsemble) {
        serviceConfiguration.setClusterName(str);
        serviceConfiguration.setAdvertisedAddress("localhost");
        serviceConfiguration.setWebServicePort(Optional.of(0));
        serviceConfiguration.setWebServicePortTls(Optional.of(0));
        serviceConfiguration.setMetadataStoreUrl("zk:127.0.0.1:" + localBookkeeperEnsemble.getZookeeperPort());
        serviceConfiguration.setConfigurationMetadataStoreUrl("zk:127.0.0.1:" + this.globalZkS.getZookeeperPort() + "/foo");
        serviceConfiguration.setBrokerDeleteInactiveTopicsEnabled(isBrokerServicePurgeInactiveTopic());
        serviceConfiguration.setBrokerDeleteInactiveTopicsFrequencySeconds(inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
        serviceConfiguration.setBrokerShutdownTimeoutMs(0L);
        serviceConfiguration.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(Double.valueOf(1.0d)));
        serviceConfiguration.setBrokerServicePort(Optional.of(0));
        serviceConfiguration.setBrokerServicePortTls(Optional.of(0));
        serviceConfiguration.setTlsCertificateFilePath(this.brokerCertFilePath);
        serviceConfiguration.setTlsKeyFilePath(this.brokerFilePath);
        serviceConfiguration.setTlsTrustCertsFilePath(this.caCertFilePath);
        serviceConfiguration.setTlsEnabledWithKeyStore(this.tlsWithKeyStore);
        serviceConfiguration.setTlsKeyStore(brokerKeyStorePath);
        serviceConfiguration.setTlsKeyStoreType("JKS");
        serviceConfiguration.setTlsKeyStorePassword("111111");
        serviceConfiguration.setTlsTrustStore(brokerTrustStorePath);
        serviceConfiguration.setTlsTrustStoreType("JKS");
        serviceConfiguration.setTlsTrustStorePassword("111111");
        serviceConfiguration.setBacklogQuotaCheckIntervalInSeconds(5);
        serviceConfiguration.setDefaultNumberOfNamespaceBundles(1);
        serviceConfiguration.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
        serviceConfiguration.setEnableReplicatedSubscriptions(true);
        serviceConfiguration.setReplicatedSubscriptionsSnapshotFrequencyMillis(1000);
    }

    public void resetConfig1() {
        this.config1 = new ServiceConfiguration();
        setConfig1DefaultValue();
    }

    public void resetConfig2() {
        this.config2 = new ServiceConfiguration();
        setConfig2DefaultValue();
    }

    public void resetConfig3() {
        this.config3 = new ServiceConfiguration();
        setConfig3DefaultValue();
    }

    public void resetConfig4() {
        this.config4 = new ServiceConfiguration();
        setConfig4DefaultValue();
    }

    private int inSec(int i, TimeUnit timeUnit) {
        return (int) TimeUnit.SECONDS.convert(i, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanup() throws Exception {
        markCurrentSetupNumberCleaned();
        log.info("--- Shutting down ---");
        if (this.executor != null) {
            this.executor.shutdownNow();
            this.executor = null;
        }
        this.admin1.close();
        this.admin2.close();
        this.admin3.close();
        this.admin4.close();
        if (this.pulsar4 != null) {
            this.pulsar4.close();
        }
        if (this.pulsar3 != null) {
            this.pulsar3.close();
        }
        if (this.pulsar2 != null) {
            this.pulsar2.close();
        }
        if (this.pulsar1 != null) {
            this.pulsar1.close();
        }
        this.bkEnsemble1.stop();
        this.bkEnsemble2.stop();
        this.bkEnsemble3.stop();
        this.bkEnsemble4.stop();
        this.globalZkS.stop();
        resetConfig1();
        resetConfig2();
        resetConfig3();
        resetConfig4();
    }
}
