package org.apache.flink.streaming.connectors.kafka;

import java.io.File;
import java.net.BindException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.mutable.ArraySeq;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.class */
public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
    private File tmpZkDir;
    private File tmpKafkaParent;
    private List<File> tmpKafkaDirs;
    private TestingServer zookeeper;
    private String zookeeperConnectionString;
    private Properties standardProps;
    private KafkaTestEnvironment.Config config;
    private static final int DELETE_TIMEOUT_SECONDS = 30;
    private final List<KafkaServer> brokers = new ArrayList();
    private String brokerConnectionString = "";
    private FlinkKafkaProducer.Semantic producerSemantic = FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
    private int zkTimeout = 30000;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl$KafkaOffsetHandlerImpl.class */
    private class KafkaOffsetHandlerImpl implements KafkaTestEnvironment.KafkaOffsetHandler {
        private final KafkaConsumer<byte[], byte[]> offsetClient;

        public KafkaOffsetHandlerImpl() {
            Properties properties = new Properties();
            properties.putAll(KafkaTestEnvironmentImpl.this.standardProps);
            properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            this.offsetClient = new KafkaConsumer<>(properties);
        }

        public Long getCommittedOffset(String str, int i) {
            OffsetAndMetadata committed = this.offsetClient.committed(new TopicPartition(str, i));
            if (committed != null) {
                return Long.valueOf(committed.offset());
            }
            return null;
        }

        public void setCommittedOffset(String str, int i, long j) {
            HashMap hashMap = new HashMap();
            hashMap.put(new TopicPartition(str, i), new OffsetAndMetadata(j));
            this.offsetClient.commitSync(hashMap);
        }

        public void close() {
            this.offsetClient.close();
        }
    }

    public void setProducerSemantic(FlinkKafkaProducer.Semantic semantic) {
        this.producerSemantic = semantic;
    }

    public void prepare(KafkaTestEnvironment.Config config) throws Exception {
        if (config.isSecureMode()) {
            config.setKafkaServersNumber(1);
            this.zkTimeout *= 15;
        }
        this.config = config;
        File file = new File(System.getProperty("java.io.tmpdir"));
        this.tmpZkDir = new File(file, "kafkaITcase-zk-dir-" + UUID.randomUUID().toString());
        Assert.assertTrue("cannot create zookeeper temp dir", this.tmpZkDir.mkdirs());
        this.tmpKafkaParent = new File(file, "kafkaITcase-kafka-dir-" + UUID.randomUUID().toString());
        Assert.assertTrue("cannot create kafka temp dir", this.tmpKafkaParent.mkdirs());
        this.tmpKafkaDirs = new ArrayList(config.getKafkaServersNumber());
        for (int i = 0; i < config.getKafkaServersNumber(); i++) {
            File file2 = new File(this.tmpKafkaParent, "server-" + i);
            Assert.assertTrue("cannot create kafka temp dir", file2.mkdir());
            this.tmpKafkaDirs.add(file2);
        }
        this.zookeeper = null;
        this.brokers.clear();
        this.zookeeper = new TestingServer(-1, this.tmpZkDir);
        this.zookeeperConnectionString = this.zookeeper.getConnectString();
        LOG.info("Starting Zookeeper with zookeeperConnectionString: {}", this.zookeeperConnectionString);
        LOG.info("Starting KafkaServer");
        ListenerName forSecurityProtocol = ListenerName.forSecurityProtocol(config.isSecureMode() ? SecurityProtocol.SASL_PLAINTEXT : SecurityProtocol.PLAINTEXT);
        for (int i2 = 0; i2 < config.getKafkaServersNumber(); i2++) {
            KafkaServer kafkaServer = getKafkaServer(i2, this.tmpKafkaDirs.get(i2));
            this.brokers.add(kafkaServer);
            this.brokerConnectionString += NetUtils.hostAndPortToUrlString("localhost", kafkaServer.socketServer().boundPort(forSecurityProtocol));
            this.brokerConnectionString += ",";
        }
        LOG.info("ZK and KafkaServer started.");
        this.standardProps = new Properties();
        this.standardProps.setProperty("bootstrap.servers", this.brokerConnectionString);
        this.standardProps.setProperty("group.id", "flink-tests");
        this.standardProps.setProperty("enable.auto.commit", "false");
        this.standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(this.zkTimeout));
        this.standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(this.zkTimeout));
        this.standardProps.setProperty("auto.offset.reset", "earliest");
        this.standardProps.setProperty("max.partition.fetch.bytes", "256");
    }

    public void deleteTestTopic(String str) {
        LOG.info("Deleting topic {}", str);
        Properties secureProperties = getSecureProperties();
        secureProperties.putAll(getStandardProperties());
        String l = Long.toString(new Random().nextLong());
        secureProperties.put("client.id", l);
        AdminClient create = AdminClient.create(secureProperties);
        try {
            try {
                tryDelete(create, str);
                create.close(Duration.ofMillis(5000L));
                maybePrintDanglingThreadStacktrace(l);
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(String.format("Delete test topic : %s failed, %s", str, e.getMessage()));
                create.close(Duration.ofMillis(5000L));
                maybePrintDanglingThreadStacktrace(l);
            }
        } catch (Throwable th) {
            create.close(Duration.ofMillis(5000L));
            maybePrintDanglingThreadStacktrace(l);
            throw th;
        }
    }

    private void tryDelete(AdminClient adminClient, String str) throws Exception {
        try {
            adminClient.deleteTopics(Collections.singleton(str)).all().get(30L, TimeUnit.SECONDS);
        } catch (TimeoutException e) {
            LOG.info("Did not receive delete topic response within {} seconds. Checking if it succeeded", Integer.valueOf(DELETE_TIMEOUT_SECONDS));
            if (((Set) adminClient.listTopics().names().get(30L, TimeUnit.SECONDS)).contains(str)) {
                throw new Exception("Topic still exists after timeout");
            }
        }
    }

    public void createTestTopic(String str, int i, int i2, Properties properties) {
        LOG.info("Creating topic {}", str);
        try {
            AdminClient create = AdminClient.create(getStandardProperties());
            Throwable th = null;
            try {
                try {
                    create.createTopics(Collections.singleton(new NewTopic(str, i, (short) i2))).all().get();
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Create test topic : " + str + " failed, " + e.getMessage());
        }
    }

    public Properties getStandardProperties() {
        return this.standardProps;
    }

    public Properties getSecureProperties() {
        Properties properties = new Properties();
        if (this.config.isSecureMode()) {
            properties.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
            properties.put("security.protocol", "SASL_PLAINTEXT");
            properties.put("sasl.kerberos.service.name", "kafka");
            properties.setProperty("zookeeper.session.timeout.ms", String.valueOf(this.zkTimeout));
            properties.setProperty("zookeeper.connection.timeout.ms", String.valueOf(this.zkTimeout));
            properties.setProperty("metadata.fetch.timeout.ms", "120000");
        }
        return properties;
    }

    public String getBrokerConnectionString() {
        return this.brokerConnectionString;
    }

    public String getVersion() {
        return "2.0";
    }

    public List<KafkaServer> getBrokers() {
        return this.brokers;
    }

    public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> list, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties) {
        return new FlinkKafkaConsumer(list, kafkaDeserializationSchema, properties);
    }

    public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String str, int i, long j) {
        boolean z;
        ArrayList arrayList = new ArrayList();
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        Throwable th = null;
        try {
            kafkaConsumer.assign(Arrays.asList(new TopicPartition(str, i)));
            do {
                z = false;
                Iterator it = kafkaConsumer.poll(j).iterator();
                while (it.hasNext()) {
                    arrayList.add((ConsumerRecord) it.next());
                    z = true;
                }
            } while (z);
            kafkaConsumer.commitSync();
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            return UnmodifiableList.decorate(arrayList);
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (0 != 0) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    public <T> StreamSink<T> getProducerSink(String str, SerializationSchema<T> serializationSchema, Properties properties, FlinkKafkaPartitioner<T> flinkKafkaPartitioner) {
        return new StreamSink<>(new FlinkKafkaProducer(str, serializationSchema, properties, flinkKafkaPartitioner, this.producerSemantic, 5));
    }

    public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> dataStream, String str, KeyedSerializationSchema<T> keyedSerializationSchema, Properties properties, FlinkKafkaPartitioner<T> flinkKafkaPartitioner) {
        return dataStream.addSink(new FlinkKafkaProducer(str, keyedSerializationSchema, properties, Optional.ofNullable(flinkKafkaPartitioner), this.producerSemantic, 5));
    }

    public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> dataStream, String str, SerializationSchema<T> serializationSchema, Properties properties, FlinkKafkaPartitioner<T> flinkKafkaPartitioner) {
        return dataStream.addSink(new FlinkKafkaProducer(str, serializationSchema, properties, flinkKafkaPartitioner, this.producerSemantic, 5));
    }

    public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> dataStream, String str, KafkaSerializationSchema<T> kafkaSerializationSchema, Properties properties) {
        return dataStream.addSink(new FlinkKafkaProducer(str, kafkaSerializationSchema, properties, this.producerSemantic));
    }

    public KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler() {
        return new KafkaOffsetHandlerImpl();
    }

    public void restartBroker(int i) throws Exception {
        this.brokers.set(i, getKafkaServer(i, this.tmpKafkaDirs.get(i)));
    }

    public int getLeaderToShutDown(String str) throws Exception {
        return ((TopicPartitionInfo) ((TopicDescription) ((Map) AdminClient.create(getStandardProperties()).describeTopics(Collections.singleton(str)).all().get()).get(str)).partitions().get(0)).leader().id();
    }

    public int getBrokerId(KafkaServer kafkaServer) {
        return kafkaServer.config().brokerId();
    }

    public boolean isSecureRunSupported() {
        return true;
    }

    public void shutdown() throws Exception {
        for (KafkaServer kafkaServer : this.brokers) {
            if (kafkaServer != null) {
                kafkaServer.shutdown();
            }
        }
        this.brokers.clear();
        if (this.zookeeper != null) {
            try {
                this.zookeeper.stop();
            } catch (Exception e) {
                LOG.warn("ZK.stop() failed", e);
            }
            this.zookeeper = null;
        }
        if (this.tmpKafkaParent != null && this.tmpKafkaParent.exists()) {
            try {
                FileUtils.deleteDirectory(this.tmpKafkaParent);
            } catch (Exception e2) {
            }
        }
        if (this.tmpZkDir != null && this.tmpZkDir.exists()) {
            try {
                FileUtils.deleteDirectory(this.tmpZkDir);
            } catch (Exception e3) {
            }
        }
        super.shutdown();
    }

    protected KafkaServer getKafkaServer(int i, File file) throws Exception {
        Properties properties = new Properties();
        properties.put("advertised.host.name", "localhost");
        properties.put("broker.id", Integer.toString(i));
        properties.put("log.dir", file.toString());
        properties.put("zookeeper.connect", this.zookeeperConnectionString);
        properties.put("message.max.bytes", String.valueOf(52428800));
        properties.put("replica.fetch.max.bytes", String.valueOf(52428800));
        properties.put("transaction.max.timeout.ms", Integer.toString(7200000));
        properties.put("zookeeper.session.timeout.ms", Integer.valueOf(this.zkTimeout));
        properties.put("zookeeper.connection.timeout.ms", Integer.valueOf(this.zkTimeout));
        if (this.config.getKafkaServerProperties() != null) {
            properties.putAll(this.config.getKafkaServerProperties());
        }
        for (int i2 = 1; i2 <= 5; i2++) {
            int availablePort = NetUtils.getAvailablePort();
            properties.put("port", Integer.toString(availablePort));
            if (this.config.isHideKafkaBehindProxy()) {
                properties.put("advertised.port", Integer.valueOf(createProxy("localhost", availablePort).getLocalPort()));
            }
            if (this.config.isSecureMode()) {
                LOG.info("Adding Kafka secure configurations");
                properties.put("listeners", "SASL_PLAINTEXT://localhost:" + availablePort);
                properties.put("advertised.listeners", "SASL_PLAINTEXT://localhost:" + availablePort);
                properties.putAll(getSecureProperties());
            }
            try {
                KafkaServer kafkaServer = new KafkaServer(new KafkaConfig(properties), Time.SYSTEM, Option.apply((Object) null), new ArraySeq(0));
                kafkaServer.startup();
                return kafkaServer;
            } catch (KafkaException e) {
                if (!(e.getCause() instanceof BindException)) {
                    throw e;
                }
                LOG.info("Port conflict when starting Kafka Broker. Retrying...");
            }
        }
        throw new Exception("Could not start Kafka after 5 retries due to port conflicts.");
    }
}
