package org.apache.beam.runners.spark.translation.streaming.utils;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster.class */
public class EmbeddedKafkaCluster {
    private static final Logger LOG = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
    private final List<Integer> ports;
    private final String zkConnection;
    private final Properties baseProperties;
    private final String brokerList;
    private final List<KafkaServerStartable> brokers;
    private final List<File> logDirs;

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster$EmbeddedZookeeper.class */
    public static class EmbeddedZookeeper {
        private int port;
        private int tickTime;
        private ServerCnxnFactory factory;
        private File snapshotDir;
        private File logDir;

        public EmbeddedZookeeper() {
            this(-1);
        }

        EmbeddedZookeeper(int i) {
            this(i, 500);
        }

        EmbeddedZookeeper(int i, int i2) {
            this.port = -1;
            this.tickTime = 500;
            this.port = resolvePort(i);
            this.tickTime = i2;
        }

        private static int resolvePort(int i) {
            return i == -1 ? TestUtils.getAvailablePort() : i;
        }

        public void startup() throws IOException {
            if (this.port == -1) {
                this.port = TestUtils.getAvailablePort();
            }
            this.factory = NIOServerCnxnFactory.createFactory(new InetSocketAddress("127.0.0.1", this.port), 1024);
            this.snapshotDir = TestUtils.constructTempDir("embedded-zk/snapshot");
            this.logDir = TestUtils.constructTempDir("embedded-zk/log");
            try {
                this.factory.startup(new ZooKeeperServer(this.snapshotDir, this.logDir, this.tickTime));
            } catch (InterruptedException e) {
                throw new IOException(e);
            }
        }

        public void shutdown() {
            if (this.factory != null) {
                this.factory.shutdown();
            }
            try {
                TestUtils.deleteFile(this.snapshotDir);
            } catch (FileNotFoundException e) {
            }
            try {
                TestUtils.deleteFile(this.logDir);
            } catch (FileNotFoundException e2) {
            }
        }

        public String getConnection() {
            return "127.0.0.1:" + this.port;
        }

        public void setPort(int i) {
            this.port = i;
        }

        public void setTickTime(int i) {
            this.tickTime = i;
        }

        public int getPort() {
            return this.port;
        }

        public int getTickTime() {
            return this.tickTime;
        }

        @SideEffectFree
        public String toString() {
            return "EmbeddedZookeeper{connection=" + getConnection() + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/utils/EmbeddedKafkaCluster$TestUtils.class */
    public static final class TestUtils {
        private static final Random RANDOM = new Random();

        private TestUtils() {
        }

        static File constructTempDir(String str) {
            File file = new File(System.getProperty("java.io.tmpdir"), str + RANDOM.nextInt(10000000));
            if (!file.mkdirs()) {
                throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
            }
            file.deleteOnExit();
            return file;
        }

        static int getAvailablePort() {
            try {
                ServerSocket serverSocket = new ServerSocket(0);
                Throwable th = null;
                try {
                    try {
                        int localPort = serverSocket.getLocalPort();
                        if (0 != 0) {
                            try {
                                serverSocket.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            serverSocket.close();
                        }
                        return localPort;
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new IllegalStateException("Cannot find available port: " + e.getMessage(), e);
            }
        }

        static boolean deleteFile(File file) throws FileNotFoundException {
            if (!file.exists()) {
                throw new FileNotFoundException(file.getAbsolutePath());
            }
            boolean z = true;
            if (file.isDirectory()) {
                for (File file2 : file.listFiles()) {
                    z = z && deleteFile(file2);
                }
            }
            return z && file.delete();
        }
    }

    private EmbeddedKafkaCluster(String str) {
        this(str, new Properties());
    }

    public EmbeddedKafkaCluster(String str, Properties properties) {
        this(str, properties, Collections.singletonList(-1));
    }

    private EmbeddedKafkaCluster(String str, Properties properties, List<Integer> list) {
        this.zkConnection = str;
        this.ports = resolvePorts(list);
        this.baseProperties = properties;
        this.brokers = new ArrayList();
        this.logDirs = new ArrayList();
        this.brokerList = constructBrokerList(this.ports);
    }

    private static List<Integer> resolvePorts(List<Integer> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(Integer.valueOf(resolvePort(it.next().intValue())));
        }
        return arrayList;
    }

    private static int resolvePort(int i) {
        return i == -1 ? TestUtils.getAvailablePort() : i;
    }

    private static String constructBrokerList(List<Integer> list) {
        StringBuilder sb = new StringBuilder();
        for (Integer num : list) {
            if (sb.length() > 0) {
                sb.append(",");
            }
            sb.append("127.0.0.1:").append(num);
        }
        return sb.toString();
    }

    public void startup() {
        for (int i = 0; i < this.ports.size(); i++) {
            Integer num = this.ports.get(i);
            File constructTempDir = TestUtils.constructTempDir("kafka-local");
            Properties properties = new Properties();
            properties.putAll(this.baseProperties);
            properties.setProperty("zookeeper.connect", this.zkConnection);
            properties.setProperty("broker.id", String.valueOf(i + 1));
            properties.setProperty("advertised.host.name", "127.0.0.1");
            properties.setProperty("host.name", "127.0.0.1");
            properties.setProperty("advertised.port", Integer.toString(num.intValue()));
            properties.setProperty("port", Integer.toString(num.intValue()));
            properties.setProperty("log.dirs", constructTempDir.getAbsolutePath());
            properties.setProperty("offsets.topic.num.partitions", "1");
            properties.setProperty("offsets.topic.replication.factor", "1");
            properties.setProperty("log.flush.interval.messages", String.valueOf(1));
            this.brokers.add(startBroker(properties));
            this.logDirs.add(constructTempDir);
        }
    }

    private static KafkaServerStartable startBroker(Properties properties) {
        KafkaServerStartable kafkaServerStartable = new KafkaServerStartable(new KafkaConfig(properties));
        kafkaServerStartable.startup();
        return kafkaServerStartable;
    }

    public Properties getProps() {
        Properties properties = new Properties();
        properties.putAll(this.baseProperties);
        properties.put("bootstrap.servers", this.brokerList);
        return properties;
    }

    public String getBrokerList() {
        return this.brokerList;
    }

    public List<Integer> getPorts() {
        return this.ports;
    }

    public String getZkConnection() {
        return this.zkConnection;
    }

    public void shutdown() {
        Iterator<KafkaServerStartable> it = this.brokers.iterator();
        while (it.hasNext()) {
            try {
                it.next().shutdown();
            } catch (Exception e) {
                LOG.warn("{}", e.getMessage(), e);
            }
        }
        Iterator<File> it2 = this.logDirs.iterator();
        while (it2.hasNext()) {
            try {
                TestUtils.deleteFile(it2.next());
            } catch (FileNotFoundException e2) {
                LOG.warn("{}", e2.getMessage(), e2);
            }
        }
    }

    @SideEffectFree
    public String toString() {
        return "EmbeddedKafkaCluster{brokerList='" + this.brokerList + "'}";
    }
}
