/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.metrics.reporter.integration;

import io.confluent.common.utils.IntegrationTest;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.experimental.categories.Category;
import scala.Option;
import scala.Option$;
import scala.collection.JavaConversions;
import scala.collection.Seq;

@Category(value={IntegrationTest.class})
public class MetricReporterClusterTestHarness {
    protected static final Option<Properties> EMPTY_SASL_PROPERTIES = Option$.MODULE$.empty();
    protected EmbeddedZookeeper zookeeper;
    protected String zkConnect;
    protected ZkUtils zkUtils;
    protected int zkConnectionTimeout = 30000;
    protected int zkSessionTimeout = 6000;
    protected List<KafkaConfig> configs = null;
    protected List<KafkaServer> servers = null;
    protected String brokerList = null;
    protected boolean enableMetricsReporterOnBroker = true;

    private boolean zkAcls() {
        return this.securityProtocol() == SecurityProtocol.SASL_PLAINTEXT || this.securityProtocol() == SecurityProtocol.SASL_SSL;
    }

    @Before
    public void setUp() throws Exception {
        this.zookeeper = new EmbeddedZookeeper();
        this.zkConnect = String.format("localhost:%d", this.zookeeper.port());
        this.zkUtils = ZkUtils.apply((String)this.zkConnect, (int)this.zkSessionTimeout, (int)this.zkConnectionTimeout, (boolean)this.zkAcls());
        this.configs = new Vector<KafkaConfig>();
        this.servers = new Vector<KafkaServer>();
        KafkaConfig config = this.kafkaConfig(0);
        this.configs.add(config);
        KafkaServer server = TestUtils.createServer((KafkaConfig)config, (Time)Time.SYSTEM);
        this.servers.add(server);
        String broker0List = TestUtils.getBrokerListStrFromServers((Seq)JavaConversions.asScalaBuffer(this.servers), (SecurityProtocol)this.securityProtocol());
        config = this.enableMetricsReporterOnBroker ? this.kafkaConfigWithMetricReporter(1, broker0List) : this.kafkaConfig(1);
        this.configs.add(config);
        server = TestUtils.createServer((KafkaConfig)config, (Time)Time.SYSTEM);
        this.servers.add(server);
        TestUtils.createTopic((ZkUtils)this.zkUtils, (String)"__consumer_offsets", (int)config.getInt(KafkaConfig.OffsetsTopicPartitionsProp()), (int)1, (Seq)JavaConversions.asScalaBuffer(this.servers), (Properties)this.servers.get(0).groupCoordinator().offsetsTopicConfigs());
        this.brokerList = TestUtils.getBrokerListStrFromServers((Seq)JavaConversions.asScalaBuffer(this.servers), (SecurityProtocol)this.securityProtocol());
    }

    protected void injectProperties(Properties props) {
        props.setProperty(KafkaConfig.AutoCreateTopicsEnableProp(), "false");
        props.setProperty(KafkaConfig.NumPartitionsProp(), "1");
    }

    protected void injectMetricReporterProperties(Properties props, String brokerList) {
        props.setProperty(KafkaConfig.MetricReporterClassesProp(), "io.confluent.metrics.reporter.ConfluentMetricsReporter");
        props.setProperty("confluent.metrics.reporter.bootstrap.servers", brokerList);
        props.setProperty("confluent.metrics.reporter.topic.replicas", "1");
        props.setProperty("confluent.metrics.reporter.publish.ms", "500");
        props.setProperty("confluent.metrics.reporter.whitelist", "");
        props.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
    }

    protected KafkaConfig kafkaConfig(int brokerId) {
        Option noFile = Option.apply(null);
        Option noInterBrokerSecurityProtocol = Option.apply(null);
        Properties props = TestUtils.createBrokerConfig((int)brokerId, (String)this.zkConnect, (boolean)false, (boolean)false, (int)TestUtils.RandomPort(), (Option)noInterBrokerSecurityProtocol, (Option)noFile, EMPTY_SASL_PROPERTIES, (boolean)true, (boolean)false, (int)TestUtils.RandomPort(), (boolean)false, (int)TestUtils.RandomPort(), (boolean)false, (int)TestUtils.RandomPort(), (Option)Option.empty(), (int)1);
        this.injectProperties(props);
        return KafkaConfig.fromProps((Properties)props);
    }

    private KafkaConfig kafkaConfigWithMetricReporter(int brokerId, String brokerList) {
        Option noFile = Option.apply(null);
        Option noInterBrokerSecurityProtocol = Option.apply(null);
        Properties props = TestUtils.createBrokerConfig((int)brokerId, (String)this.zkConnect, (boolean)false, (boolean)false, (int)TestUtils.RandomPort(), (Option)noInterBrokerSecurityProtocol, (Option)noFile, EMPTY_SASL_PROPERTIES, (boolean)true, (boolean)false, (int)TestUtils.RandomPort(), (boolean)false, (int)TestUtils.RandomPort(), (boolean)false, (int)TestUtils.RandomPort(), (Option)Option.empty(), (int)1);
        this.injectProperties(props);
        this.injectMetricReporterProperties(props, brokerList);
        return KafkaConfig.fromProps((Properties)props);
    }

    protected SecurityProtocol securityProtocol() {
        return SecurityProtocol.PLAINTEXT;
    }

    @After
    public void tearDown() throws Exception {
        if (this.servers != null) {
            for (KafkaServer server : this.servers) {
                server.shutdown();
            }
            for (KafkaServer server : this.servers) {
                CoreUtils.delete((Seq)server.config().logDirs());
            }
        }
        if (this.zkUtils != null) {
            this.zkUtils.close();
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
    }
}

