package io.confluent.kafkarest.integration;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityLevel;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.KafkaJsonSerializer;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.ProducerPool;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Configurable;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import kafka.zk.KafkaZkClient;
import kafka.zookeeper.ZooKeeperClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitionReassignment;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.JaasUtils;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Time;
import org.eclipse.jetty.server.Server;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.experimental.categories.Category;
import scala.Option;
import scala.collection.JavaConverters;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/kafkarest/integration/ClusterTestHarness.class */
public abstract class ClusterTestHarness {
    public static final int DEFAULT_NUM_BROKERS = 1;
    private int numBrokers;
    private boolean withSchemaRegistry;
    protected String zkConnect;
    protected EmbeddedZookeeper zookeeper;
    protected KafkaZkClient zkClient;
    protected int zkConnectionTimeout;
    protected int zkSessionTimeout;
    protected List<KafkaConfig> configs;
    protected List<KafkaServer> servers;
    protected String brokerList;
    protected String plaintextBrokerList;
    protected String schemaRegCompatibility;
    protected Properties schemaRegProperties;
    protected String schemaRegConnect;
    protected SchemaRegistryRestApplication schemaRegApp;
    protected Server schemaRegServer;
    protected Properties restProperties;
    protected KafkaRestConfig restConfig;
    protected TestKafkaRestApplication restApp;
    protected Server restServer;
    protected String restConnect;

    public static int[] choosePorts(int i) {
        try {
            ServerSocket[] serverSocketArr = new ServerSocket[i];
            int[] iArr = new int[i];
            for (int i2 = 0; i2 < i; i2++) {
                serverSocketArr[i2] = new ServerSocket(0);
                iArr[i2] = serverSocketArr[i2].getLocalPort();
            }
            for (int i3 = 0; i3 < i; i3++) {
                serverSocketArr[i3].close();
            }
            return iArr;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static int choosePort() {
        return choosePorts(1)[0];
    }

    public ClusterTestHarness() {
        this(1, false);
    }

    public ClusterTestHarness(int i, boolean z) {
        this.zkConnectionTimeout = 10000;
        this.zkSessionTimeout = 6000;
        this.configs = null;
        this.servers = null;
        this.brokerList = null;
        this.plaintextBrokerList = null;
        this.schemaRegCompatibility = AvroCompatibilityLevel.NONE.name;
        this.schemaRegProperties = null;
        this.schemaRegConnect = null;
        this.schemaRegApp = null;
        this.schemaRegServer = null;
        this.restProperties = null;
        this.restConfig = null;
        this.restApp = null;
        this.restServer = null;
        this.restConnect = null;
        this.numBrokers = i;
        this.withSchemaRegistry = z;
        this.schemaRegProperties = new Properties();
        this.restProperties = new Properties();
    }

    public Properties overrideBrokerProperties(int i, Properties properties) {
        return properties;
    }

    public Properties overrideSchemaRegistryProps(Properties properties) {
        return properties;
    }

    @Before
    public void setUp() throws Exception {
        this.zookeeper = new EmbeddedZookeeper();
        this.zkConnect = String.format("127.0.0.1:%d", Integer.valueOf(this.zookeeper.port()));
        Time time = Time.SYSTEM;
        this.zkClient = new KafkaZkClient(new ZooKeeperClient(this.zkConnect, this.zkSessionTimeout, this.zkConnectionTimeout, Integer.MAX_VALUE, time, "testMetricGroup", "testMetricGroupType"), JaasUtils.isZkSaslEnabled(), time);
        this.configs = new Vector();
        this.servers = new Vector();
        for (int i = 0; i < this.numBrokers; i++) {
            KafkaConfig fromProps = KafkaConfig.fromProps(overrideBrokerProperties(i, getBrokerProperties(i)));
            this.configs.add(fromProps);
            this.servers.add(TestUtils.createServer(fromProps, time));
        }
        this.brokerList = TestUtils.getBrokerListStrFromServers(JavaConverters.asScalaBuffer(this.servers), getBrokerSecurityProtocol());
        this.plaintextBrokerList = TestUtils.getBrokerListStrFromServers(JavaConverters.asScalaBuffer(this.servers), SecurityProtocol.PLAINTEXT);
        setupAcls();
        if (this.withSchemaRegistry) {
            int choosePort = choosePort();
            this.schemaRegProperties.put("port", Integer.valueOf(choosePort).toString());
            this.schemaRegProperties.put("kafkastore.connection.url", this.zkConnect);
            this.schemaRegProperties.put("kafkastore.topic", "_schemas");
            this.schemaRegProperties.put("avro.compatibility.level", this.schemaRegCompatibility);
            this.schemaRegProperties.put("kafkastore.bootstrap.servers", SecurityProtocol.PLAINTEXT.name + "://" + TestUtils.getBrokerListStrFromServers(JavaConverters.asScalaBuffer(this.servers), SecurityProtocol.PLAINTEXT));
            this.schemaRegConnect = String.format("http://localhost:%d", Integer.valueOf(choosePort));
            this.schemaRegProperties = overrideSchemaRegistryProps(this.schemaRegProperties);
            this.schemaRegApp = new SchemaRegistryRestApplication(new SchemaRegistryConfig(this.schemaRegProperties));
            this.schemaRegServer = this.schemaRegApp.createServer();
            this.schemaRegServer.start();
        }
        int choosePort2 = choosePort();
        this.restProperties.put("port", Integer.valueOf(choosePort2).toString());
        this.restProperties.put("bootstrap.servers", this.brokerList);
        overrideKafkaRestConfigs(this.restProperties);
        if (this.withSchemaRegistry) {
            this.restProperties.put("schema.registry.url", this.schemaRegConnect);
        }
        this.restConnect = getRestConnectString(choosePort2);
        this.restProperties.put("listeners", this.restConnect);
        this.restConfig = new KafkaRestConfig(this.restProperties);
        this.restApp = new TestKafkaRestApplication(this.restConfig, getProducerPool(this.restConfig), null);
        this.restServer = this.restApp.createServer();
        this.restServer.start();
    }

    protected void setupAcls() {
    }

    protected SecurityProtocol getBrokerSecurityProtocol() {
        return SecurityProtocol.PLAINTEXT;
    }

    protected String getRestConnectString(int i) {
        return String.format("http://localhost:%d", Integer.valueOf(i));
    }

    protected void overrideKafkaRestConfigs(Properties properties) {
    }

    protected Properties getBrokerProperties(int i) {
        Option apply = Option.apply((Object) null);
        Properties createBrokerConfig = TestUtils.createBrokerConfig(i, this.zkConnect, false, false, TestUtils.RandomPort(), Option.apply(getBrokerSecurityProtocol()), apply, Option.empty(), true, false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Option.empty(), 1, false, 1, (short) 1);
        createBrokerConfig.setProperty("auto.create.topics.enable", "false");
        createBrokerConfig.setProperty("zookeeper.connect", this.zkConnect);
        return createBrokerConfig;
    }

    protected ProducerPool getProducerPool(KafkaRestConfig kafkaRestConfig) {
        return null;
    }

    @After
    public void tearDown() throws Exception {
        if (this.restServer != null) {
            this.restServer.stop();
            this.restServer.join();
        }
        if (this.schemaRegServer != null) {
            this.schemaRegServer.stop();
            this.schemaRegServer.join();
        }
        Iterator<KafkaServer> it = this.servers.iterator();
        while (it.hasNext()) {
            it.next().shutdown();
        }
        Iterator<KafkaServer> it2 = this.servers.iterator();
        while (it2.hasNext()) {
            CoreUtils.delete(it2.next().config().logDirs());
        }
        this.zkClient.close();
        this.zookeeper.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Invocation.Builder request(String str) {
        return request(str, null, null, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Invocation.Builder request(String str, Map<String, String> map) {
        return request(str, null, null, map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Invocation.Builder request(String str, String str2, Object obj) {
        return request(str, str2, obj, null);
    }

    protected Invocation.Builder request(String str, String str2, Object obj, Map<String, String> map) {
        Configurable client = getClient();
        this.restApp.configureBaseApplication(client);
        URI uri = null;
        try {
            uri = new URI(str);
        } catch (URISyntaxException e) {
        }
        WebTarget path = (uri == null || !uri.isAbsolute()) ? client.target(this.restConnect).path(str) : client.target(str);
        if (str2 != null && obj != null) {
            path = path.resolveTemplate(str2, obj);
        }
        if (map != null) {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                path = path.queryParam(entry.getKey(), new Object[]{entry.getValue()});
            }
        }
        return path.request();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Client getClient() {
        return ClientBuilder.newClient();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getClusterId() {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        try {
            return (String) AdminClient.create(adminProperties).describeCluster().clusterId().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final int getControllerID() {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        try {
            return ((Node) AdminClient.create(adminProperties).describeCluster().controller().get()).id();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final ArrayList<Node> getBrokers() {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        try {
            return new ArrayList<>((Collection) AdminClient.create(adminProperties).describeCluster().nodes().get());
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Set<String> getTopicNames() {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        try {
            return (Set) AdminClient.create(adminProperties).listTopics().names().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(String.format("Failed to create topic: %s", e.getMessage()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void createTopic(String str, int i, short s) {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        try {
            AdminClient.create(adminProperties).createTopics(Collections.singletonList(new NewTopic(str, i, s))).all().get();
        } catch (InterruptedException | ExecutionException e) {
            Assert.fail(String.format("Failed to create topic: %s", e.getMessage()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void createTopic(String str, Map<Integer, List<Integer>> map) {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        try {
            AdminClient.create(adminProperties).createTopics(Collections.singletonList(new NewTopic(str, map))).all().get();
        } catch (InterruptedException | ExecutionException e) {
            Assert.fail(String.format("Failed to create topic: %s", e.getMessage()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void alterPartitionReassignment(Map<TopicPartition, Optional<NewPartitionReassignment>> map) {
        Properties adminProperties = this.restConfig.getAdminProperties();
        adminProperties.put("bootstrap.servers", this.brokerList);
        AdminClient.create(adminProperties).alterPartitionReassignments(map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void produceAvroMessages(List<ProducerRecord<Object, Object>> list) {
        HashMap hashMap = new HashMap();
        hashMap.put("schema.registry.url", this.schemaRegConnect);
        KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer();
        kafkaAvroSerializer.configure(hashMap, true);
        KafkaAvroSerializer kafkaAvroSerializer2 = new KafkaAvroSerializer();
        kafkaAvroSerializer2.configure(hashMap, false);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        properties.put("acks", "all");
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        KafkaProducer kafkaProducer = new KafkaProducer(properties, kafkaAvroSerializer, kafkaAvroSerializer2);
        Iterator<ProducerRecord<Object, Object>> it = list.iterator();
        while (it.hasNext()) {
            try {
                kafkaProducer.send(it.next()).get();
            } catch (Exception e) {
                Assert.fail("Couldn't produce input messages to Kafka: " + e);
            }
        }
        kafkaProducer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void produceBinaryMessages(List<ProducerRecord<byte[], byte[]>> list) {
        Properties properties = new Properties();
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        properties.setProperty("bootstrap.servers", this.brokerList);
        properties.setProperty("acks", "all");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Iterator<ProducerRecord<byte[], byte[]>> it = list.iterator();
        while (it.hasNext()) {
            try {
                kafkaProducer.send(it.next()).get();
            } catch (Exception e) {
                Assert.fail("Couldn't produce input messages to Kafka: " + e);
            }
        }
        kafkaProducer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void produceJsonMessages(List<ProducerRecord<Object, Object>> list) {
        Properties properties = new Properties();
        properties.put("key.serializer", KafkaJsonSerializer.class);
        properties.put("value.serializer", KafkaJsonSerializer.class);
        properties.setProperty("bootstrap.servers", this.brokerList);
        properties.setProperty("acks", "all");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Iterator<ProducerRecord<Object, Object>> it = list.iterator();
        while (it.hasNext()) {
            try {
                kafkaProducer.send(it.next()).get();
            } catch (Exception e) {
                Assert.fail("Couldn't produce input messages to Kafka: " + e);
            }
        }
        kafkaProducer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<Integer, List<Integer>> createAssignment(List<Integer> list, int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            hashMap.put(Integer.valueOf(i2), list);
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<TopicPartition, Optional<NewPartitionReassignment>> createReassignment(List<Integer> list, String str, int i) {
        HashMap hashMap = new HashMap();
        for (int i2 = 0; i2 < i; i2++) {
            hashMap.put(new TopicPartition(str, i2), Optional.of(new NewPartitionReassignment(list)));
        }
        return hashMap;
    }
}
