package org.apache.drill.exec.store.kafka;

import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.zk.KafkaZkClient;
import org.apache.drill.categories.KafkaStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.ZookeeperTestUtil;
import org.apache.drill.exec.store.kafka.cluster.EmbeddedKafkaCluster;
import org.apache.drill.exec.store.kafka.decoders.MessageReaderFactoryTest;
import org.apache.drill.test.BaseTest;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

@RunWith(Suite.class)
@Category({KafkaStorageTest.class, SlowTest.class})
@Suite.SuiteClasses({KafkaQueriesTest.class, MessageIteratorTest.class, MessageReaderFactoryTest.class, KafkaFilterPushdownTest.class})
/* loaded from: input_file:org/apache/drill/exec/store/kafka/TestKafkaSuit.class */
public class TestKafkaSuit extends BaseTest {
    private static final String LOGIN_CONF_RESOURCE_PATHNAME = "login.conf";
    public static EmbeddedKafkaCluster embeddedKafkaCluster;
    private static KafkaZkClient zkClient;
    static final int NUM_JSON_MSG = 10;
    private static final int CONN_TIMEOUT = 8000;
    private static final int SESSION_TIMEOUT = 10000;
    private static final Logger logger = LoggerFactory.getLogger(TestKafkaSuit.class);
    private static final AtomicInteger initCount = new AtomicInteger(0);
    private static volatile boolean runningSuite = true;

    @BeforeClass
    public static void initKafka() throws Exception {
        synchronized (TestKafkaSuit.class) {
            if (initCount.get() == 0) {
                ZookeeperTestUtil.setZookeeperSaslTestConfigProps();
                System.setProperty("java.security.auth.login.config", ClassLoader.getSystemResource(LOGIN_CONF_RESOURCE_PATHNAME).getFile());
                embeddedKafkaCluster = new EmbeddedKafkaCluster();
                zkClient = KafkaZkClient.apply(embeddedKafkaCluster.getZkServer().getConnectionString(), false, SESSION_TIMEOUT, CONN_TIMEOUT, 0, Time.SYSTEM, "kafka.server", "SessionExpireListener", Option.empty(), Option.empty());
                createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
                createTopicHelper(TestQueryConstants.AVRO_TOPIC, 1);
                KafkaMessageGenerator kafkaMessageGenerator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
                KafkaMessageGenerator kafkaMessageGenerator2 = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), KafkaAvroSerializer.class);
                kafkaMessageGenerator.populateJsonMsgIntoKafka(TestQueryConstants.JSON_TOPIC, NUM_JSON_MSG);
                kafkaMessageGenerator2.populateAvroMsgIntoKafka(TestQueryConstants.AVRO_TOPIC, NUM_JSON_MSG);
            }
            initCount.incrementAndGet();
            runningSuite = true;
        }
        logger.info("Initialized Embedded Zookeeper and Kafka");
    }

    public static boolean isRunningSuite() {
        return runningSuite;
    }

    @AfterClass
    public static void tearDownCluster() {
        synchronized (TestKafkaSuit.class) {
            if (initCount.decrementAndGet() == 0) {
                if (zkClient != null) {
                    zkClient.close();
                    zkClient = null;
                }
                if (embeddedKafkaCluster != null && !embeddedKafkaCluster.getBrokers().isEmpty()) {
                    embeddedKafkaCluster.shutDownCluster();
                    embeddedKafkaCluster = null;
                }
            }
        }
    }

    public static void createTopicHelper(String str, int i) throws ExecutionException, InterruptedException {
        AdminClient initAdminClient = initAdminClient();
        Throwable th = null;
        try {
            try {
                NewTopic newTopic = new NewTopic(str, i, (short) 1);
                HashMap hashMap = new HashMap();
                hashMap.put("message.timestamp.type", "CreateTime");
                hashMap.put("retention.ms", "-1");
                newTopic.configs(hashMap);
                initAdminClient.createTopics(Collections.singletonList(newTopic)).all().get();
                if (initAdminClient != null) {
                    if (0 == 0) {
                        initAdminClient.close();
                        return;
                    }
                    try {
                        initAdminClient.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (initAdminClient != null) {
                if (th != null) {
                    try {
                        initAdminClient.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    initAdminClient.close();
                }
            }
            throw th4;
        }
    }

    private static AdminClient initAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", embeddedKafkaCluster.getKafkaBrokerList());
        return AdminClient.create(properties);
    }
}
