package org.apache.apex.malhar.kafka;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.CancelledKeyException;
import java.util.Properties;
import kafka.admin.TopicCommand;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaOperatorTestBase.class */
public class KafkaOperatorTestBase {
    public static final String END_TUPLE = "END_TUPLE";
    public static final int[] TEST_ZOOKEEPER_PORT;
    public static final int[] TEST_KAFKA_BROKER_PORT;
    public static final String TEST_TOPIC = "testtopic";
    public static int testCounter = 0;
    static final Logger logger;
    private static KafkaServerStartable[] broker;
    private static ServerCnxnFactory[] zkFactory;
    private static ZooKeeperServer[] zkServer;
    public static String baseDir;
    private static final String zkBaseDir = "zookeeper-server-data";
    private static final String kafkaBaseDir = "kafka-server-data";
    private static final String[] zkdir;
    private static final String[] kafkadir;
    protected boolean hasMultiPartition = false;
    protected boolean hasMultiCluster = false;

    /* loaded from: input_file:org/apache/apex/malhar/kafka/KafkaOperatorTestBase$TestZookeeperServer.class */
    public static class TestZookeeperServer extends ZooKeeperServer {
        public TestZookeeperServer() {
        }

        public TestZookeeperServer(File file, File file2, int i) throws IOException {
            super(file, file2, i);
        }

        public TestZookeeperServer(FileTxnSnapLog fileTxnSnapLog, ZooKeeperServer.DataTreeBuilder dataTreeBuilder) throws IOException {
            super(fileTxnSnapLog, dataTreeBuilder);
        }

        public TestZookeeperServer(FileTxnSnapLog fileTxnSnapLog, int i, ZooKeeperServer.DataTreeBuilder dataTreeBuilder) throws IOException {
            super(fileTxnSnapLog, i, dataTreeBuilder);
        }

        public TestZookeeperServer(FileTxnSnapLog fileTxnSnapLog, int i, int i2, int i3, ZooKeeperServer.DataTreeBuilder dataTreeBuilder, ZKDatabase zKDatabase) {
            super(fileTxnSnapLog, i, i2, i3, dataTreeBuilder, zKDatabase);
        }

        protected void registerJMX() {
        }

        protected void unregisterJMX() {
        }
    }

    public static void startZookeeper(int i) {
        try {
            File file = new File(baseDir, zkdir[i]);
            zkServer[i] = new TestZookeeperServer(file, file, 2000);
            zkFactory[i] = new NIOServerCnxnFactory();
            zkFactory[i].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[i]), 100);
            zkFactory[i].startup(zkServer[i]);
            Thread.sleep(2000L);
        } catch (Exception e) {
            logger.error(e.getLocalizedMessage());
        }
    }

    public static void stopZookeeper() {
        for (ZooKeeperServer zooKeeperServer : zkServer) {
            if (zooKeeperServer != null) {
                zooKeeperServer.shutdown();
            }
        }
        for (ServerCnxnFactory serverCnxnFactory : zkFactory) {
            if (serverCnxnFactory != null) {
                serverCnxnFactory.closeAll();
                serverCnxnFactory.shutdown();
            }
        }
        zkServer = new ZooKeeperServer[2];
        zkFactory = new ServerCnxnFactory[2];
    }

    public static void startKafkaServer(int i, int i2) {
        Properties properties = new Properties();
        properties.setProperty("broker.id", "" + (i * 10) + i2);
        properties.setProperty("log.dirs", new File(baseDir, kafkadir[i]).toString());
        properties.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[i]);
        properties.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[i]);
        properties.setProperty("default.replication.factor", "1");
        properties.setProperty("log.flush.interval.messages", "50000");
        broker[i] = new KafkaServerStartable(new KafkaConfig(properties));
        broker[i].startup();
    }

    public static void startKafkaServer() {
        FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
        startKafkaServer(0, 0);
        startKafkaServer(1, 0);
    }

    public static void stopKafkaServer() {
        for (int i = 0; i < broker.length; i++) {
            if (broker[i] != null) {
                broker[i].shutdown();
                broker[i].awaitShutdown();
                broker[i] = null;
            }
        }
    }

    @BeforeClass
    public static void beforeTest() {
        try {
            startZookeeper();
            startKafkaServer();
        } catch (CancelledKeyException e) {
            logger.debug("LSHIL {}", e.getLocalizedMessage());
        }
    }

    public static void startZookeeper() {
        FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
        startZookeeper(0);
        startZookeeper(1);
    }

    public void createTopic(int i, String str) {
        String[] strArr = new String[9];
        strArr[0] = "--zookeeper";
        strArr[1] = "localhost:" + TEST_ZOOKEEPER_PORT[i];
        strArr[2] = "--replication-factor";
        strArr[3] = "1";
        strArr[4] = "--partitions";
        if (this.hasMultiPartition) {
            strArr[5] = "2";
        } else {
            strArr[5] = "1";
        }
        strArr[6] = "--topic";
        strArr[7] = str;
        strArr[8] = "--create";
        TopicCommand.createTopic(ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[i], 30000, 30000, false), new TopicCommand.TopicCommandOptions(strArr));
    }

    @AfterClass
    public static void afterTest() {
        try {
            stopKafkaServer();
            stopZookeeper();
        } catch (Exception e) {
            logger.debug("LSHIL {}", e.getLocalizedMessage());
        }
    }

    public void setHasMultiPartition(boolean z) {
        this.hasMultiPartition = z;
    }

    public void setHasMultiCluster(boolean z) {
        this.hasMultiCluster = z;
    }

    static {
        ServerSocket[] serverSocketArr = new ServerSocket[6];
        int[] iArr = new int[6];
        for (int i = 0; i < 6; i++) {
            try {
                serverSocketArr[i] = new ServerSocket(0);
                iArr[i] = serverSocketArr[i].getLocalPort();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        for (int i2 = 0; i2 < 6; i2++) {
            serverSocketArr[i2].close();
        }
        TEST_ZOOKEEPER_PORT = new int[]{iArr[0], iArr[1]};
        TEST_KAFKA_BROKER_PORT = new int[]{iArr[2], iArr[3]};
        logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
        broker = new KafkaServerStartable[2];
        zkFactory = new ServerCnxnFactory[2];
        zkServer = new ZooKeeperServer[2];
        baseDir = "target";
        zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
        kafkadir = new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"};
    }
}
