/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.io.File;
import java.net.BindException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import kafka.admin.AdminUtils;
import kafka.api.PartitionMetadata;
import kafka.cluster.BrokerEndPoint;
import kafka.common.KafkaException;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.SystemTime$;
import kafka.utils.Time;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.Seq;

public class KafkaTestEnvironmentImpl
extends KafkaTestEnvironment {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
    private File tmpZkDir;
    private File tmpKafkaParent;
    private List<File> tmpKafkaDirs;
    private List<KafkaServer> brokers;
    private TestingServer zookeeper;
    private String zookeeperConnectionString;
    private String brokerConnectionString = "";
    private Properties standardProps;
    private Properties additionalServerProperties;

    public String getBrokerConnectionString() {
        return this.brokerConnectionString;
    }

    public Properties getStandardProperties() {
        return this.standardProps;
    }

    public String getVersion() {
        return "0.9";
    }

    public List<KafkaServer> getBrokers() {
        return this.brokers;
    }

    public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props) {
        return new FlinkKafkaConsumer09(topics, readSchema, props);
    }

    public <T> FlinkKafkaProducerBase<T> getProducer(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
        FlinkKafkaProducer09 prod = new FlinkKafkaProducer09(topic, serSchema, props, partitioner);
        prod.setFlushOnCheckpoint(true);
        return prod;
    }

    public void restartBroker(int leaderId) throws Exception {
        this.brokers.set(leaderId, this.getKafkaServer(leaderId, this.tmpKafkaDirs.get(leaderId)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int getLeaderToShutDown(String topic) throws Exception {
        try (ZkUtils zkUtils = this.getZkUtils();){
            Seq partitionMetadata;
            PartitionMetadata firstPart = null;
            do {
                if (firstPart == null) continue;
                LOG.info("Unable to find leader. error code {}", (Object)firstPart.errorCode());
                Thread.sleep(150L);
            } while ((firstPart = (PartitionMetadata)(partitionMetadata = AdminUtils.fetchTopicMetadataFromZk((String)topic, (ZkUtils)zkUtils).partitionsMetadata()).head()).errorCode() != 0);
            int n = ((BrokerEndPoint)firstPart.leader().get()).id();
            return n;
        }
    }

    public int getBrokerId(KafkaServer server) {
        return server.config().brokerId();
    }

    public void prepare(int numKafkaServers, Properties additionalServerProperties) {
        int i;
        this.additionalServerProperties = additionalServerProperties;
        File tempDir = new File(System.getProperty("java.io.tmpdir"));
        this.tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + UUID.randomUUID().toString());
        Assert.assertTrue((String)"cannot create zookeeper temp dir", (boolean)this.tmpZkDir.mkdirs());
        this.tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + UUID.randomUUID().toString());
        Assert.assertTrue((String)"cannot create kafka temp dir", (boolean)this.tmpKafkaParent.mkdirs());
        this.tmpKafkaDirs = new ArrayList<File>(numKafkaServers);
        for (i = 0; i < numKafkaServers; ++i) {
            File tmpDir = new File(this.tmpKafkaParent, "server-" + i);
            Assert.assertTrue((String)"cannot create kafka temp dir", (boolean)tmpDir.mkdir());
            this.tmpKafkaDirs.add(tmpDir);
        }
        this.zookeeper = null;
        this.brokers = null;
        try {
            LOG.info("Starting Zookeeper");
            this.zookeeper = new TestingServer(-1, this.tmpZkDir);
            this.zookeeperConnectionString = this.zookeeper.getConnectString();
            LOG.info("Starting KafkaServer");
            this.brokers = new ArrayList<KafkaServer>(numKafkaServers);
            for (i = 0; i < numKafkaServers; ++i) {
                this.brokers.add(this.getKafkaServer(i, this.tmpKafkaDirs.get(i)));
                SocketServer socketServer = this.brokers.get(i).socketServer();
                this.brokerConnectionString = this.brokerConnectionString + NetUtils.hostAndPortToUrlString((String)"localhost", (int)this.brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
            }
            LOG.info("ZK and KafkaServer started.");
        }
        catch (Throwable t) {
            t.printStackTrace();
            Assert.fail((String)("Test setup failed: " + t.getMessage()));
        }
        this.standardProps = new Properties();
        this.standardProps.setProperty("zookeeper.connect", this.zookeeperConnectionString);
        this.standardProps.setProperty("bootstrap.servers", this.brokerConnectionString);
        this.standardProps.setProperty("group.id", "flink-tests");
        this.standardProps.setProperty("auto.commit.enable", "false");
        this.standardProps.setProperty("zookeeper.session.timeout.ms", "30000");
        this.standardProps.setProperty("zookeeper.connection.timeout.ms", "30000");
        this.standardProps.setProperty("auto.offset.reset", "earliest");
        this.standardProps.setProperty("fetch.message.max.bytes", "256");
    }

    public void shutdown() {
        for (KafkaServer broker : this.brokers) {
            if (broker == null) continue;
            broker.shutdown();
        }
        this.brokers.clear();
        if (this.zookeeper != null) {
            try {
                this.zookeeper.stop();
            }
            catch (Exception e) {
                LOG.warn("ZK.stop() failed", (Throwable)e);
            }
            this.zookeeper = null;
        }
        if (this.tmpKafkaParent != null && this.tmpKafkaParent.exists()) {
            try {
                FileUtils.deleteDirectory((File)this.tmpKafkaParent);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        if (this.tmpZkDir != null && this.tmpZkDir.exists()) {
            try {
                FileUtils.deleteDirectory((File)this.tmpZkDir);
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
    }

    public ZkUtils getZkUtils() {
        ZkClient creator = new ZkClient(this.zookeeperConnectionString, Integer.valueOf(this.standardProps.getProperty("zookeeper.session.timeout.ms")).intValue(), Integer.valueOf(this.standardProps.getProperty("zookeeper.connection.timeout.ms")).intValue(), (ZkSerializer)new ZooKeeperStringSerializer());
        return ZkUtils.apply((ZkClient)creator, (boolean)false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties topicConfig) {
        LOG.info("Creating topic {}", (Object)topic);
        try (ZkUtils zkUtils = this.getZkUtils();){
            AdminUtils.createTopic((ZkUtils)zkUtils, (String)topic, (int)numberOfPartitions, (int)replicationFactor, (Properties)topicConfig);
        }
        long deadline = System.currentTimeMillis() + 30000L;
        do {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            ZkUtils checkZKConn = this.getZkUtils();
            if (AdminUtils.topicExists((ZkUtils)checkZKConn, (String)topic)) {
                checkZKConn.close();
                return;
            }
            checkZKConn.close();
        } while (System.currentTimeMillis() < deadline);
        Assert.fail((String)"Test topic could not be created");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deleteTestTopic(String topic) {
        try (ZkUtils zkUtils = this.getZkUtils();){
            LOG.info("Deleting topic {}", (Object)topic);
            ZkClient zk = new ZkClient(this.zookeeperConnectionString, Integer.valueOf(this.standardProps.getProperty("zookeeper.session.timeout.ms")).intValue(), Integer.valueOf(this.standardProps.getProperty("zookeeper.connection.timeout.ms")).intValue(), (ZkSerializer)new ZooKeeperStringSerializer());
            AdminUtils.deleteTopic((ZkUtils)zkUtils, (String)topic);
            zk.close();
        }
    }

    protected KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws Exception {
        Properties kafkaProperties = new Properties();
        kafkaProperties.put("advertised.host.name", "localhost");
        kafkaProperties.put("broker.id", Integer.toString(brokerId));
        kafkaProperties.put("log.dir", tmpFolder.toString());
        kafkaProperties.put("zookeeper.connect", this.zookeeperConnectionString);
        kafkaProperties.put("message.max.bytes", String.valueOf(0x3200000));
        kafkaProperties.put("replica.fetch.max.bytes", String.valueOf(0x3200000));
        kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
        kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
        if (this.additionalServerProperties != null) {
            kafkaProperties.putAll((Map<?, ?>)this.additionalServerProperties);
        }
        int numTries = 5;
        for (int i = 1; i <= 5; ++i) {
            int kafkaPort = NetUtils.getAvailablePort();
            kafkaProperties.put("port", Integer.toString(kafkaPort));
            KafkaConfig kafkaConfig = new KafkaConfig((Map)kafkaProperties);
            try {
                Option stringNone = Option.apply(null);
                KafkaServer server = new KafkaServer(kafkaConfig, (Time)SystemTime$.MODULE$, stringNone);
                server.startup();
                return server;
            }
            catch (KafkaException e) {
                if (!(e.getCause() instanceof BindException)) {
                    throw e;
                }
                LOG.info("Port conflict when starting Kafka Broker. Retrying...");
                continue;
            }
        }
        throw new Exception("Could not start Kafka after 5 retries due to port conflicts.");
    }
}

