package org.apache.apex.malhar.kafka;

import com.google.common.base.Throwables;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.ZKStringSerializer$;
import kafka.utils.ZkUtils;
import kafka.zk.EmbeddedZookeeper;
import org.I0Itec.zkclient.ZkClient;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/apex/malhar/kafka/EmbeddedKafka.class */
public class EmbeddedKafka {
    private static final String KAFKA_PATH = "/tmp/kafka-test";
    private ZkClient zkClient;
    private ZkUtils zkUtils;
    private String BROKERHOST = "127.0.0.1";
    private String BROKERPORT = "9092";
    private EmbeddedZookeeper zkServer;
    private KafkaServer kafkaServer;

    public String getBroker() {
        return this.BROKERHOST + ":" + this.BROKERPORT;
    }

    public void start() throws IOException {
        try {
            ServerSocket serverSocket = new ServerSocket(0);
            this.BROKERPORT = Integer.toString(serverSocket.getLocalPort());
            serverSocket.close();
            this.zkServer = new EmbeddedZookeeper();
            String str = this.BROKERHOST + ":" + this.zkServer.port();
            this.zkClient = new ZkClient(str, 30000, 30000, ZKStringSerializer$.MODULE$);
            this.zkUtils = ZkUtils.apply(this.zkClient, false);
            cleanupDir();
            Properties properties = new Properties();
            properties.setProperty("zookeeper.connect", str);
            properties.setProperty("broker.id", "0");
            properties.setProperty("log.dirs", KAFKA_PATH);
            properties.setProperty("listeners", "PLAINTEXT://" + this.BROKERHOST + ":" + this.BROKERPORT);
            this.kafkaServer = TestUtils.createServer(new KafkaConfig(properties), new MockTime());
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    public void stop() throws IOException {
        this.kafkaServer.shutdown();
        this.zkClient.close();
        this.zkServer.shutdown();
        cleanupDir();
    }

    private void cleanupDir() throws IOException {
        FileUtils.deleteDirectory(new File(KAFKA_PATH));
    }

    public void createTopic(String str) {
        AdminUtils.createTopic(this.zkUtils, str, 1, 1, new Properties());
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.kafkaServer);
        TestUtils.waitUntilMetadataIsPropagated(JavaConversions.asScalaBuffer(arrayList), str, 0, 30000L);
    }

    public void publish(String str, List<String> list) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.BROKERHOST + ":" + this.BROKERPORT);
        properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        try {
            try {
                Iterator<String> it = list.iterator();
                while (it.hasNext()) {
                    kafkaProducer.send(new ProducerRecord(str, it.next().getBytes(StandardCharsets.UTF_8)));
                }
                if (kafkaProducer != null) {
                    if (0 != 0) {
                        try {
                            kafkaProducer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaProducer.close();
                    }
                }
                ArrayList arrayList = new ArrayList();
                arrayList.add(this.kafkaServer);
                TestUtils.waitUntilMetadataIsPropagated(JavaConversions.asScalaBuffer(arrayList), str, 0, 30000L);
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    public List<String> consume(String str, int i) {
        return consume(str, i, true);
    }

    public List<String> consume(String str, int i, boolean z) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.BROKERHOST + ":" + this.BROKERPORT);
        properties.setProperty("group.id", "group0");
        properties.setProperty("client.id", "consumer0");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.put("auto.offset.reset", z ? "earliest" : "latest");
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList(str));
        ArrayList arrayList = new ArrayList();
        Iterator it = kafkaConsumer.poll(i).iterator();
        while (it.hasNext()) {
            arrayList.add(new String((byte[]) ((ConsumerRecord) it.next()).value()));
        }
        kafkaConsumer.close();
        return arrayList;
    }
}
