package org.apache.flink.connector.kafka.source.testutils;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connectors.test.common.external.ExternalContext;
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;

/* loaded from: input_file:org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.class */
public class KafkaSingleTopicExternalContext implements ExternalContext<String> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSingleTopicExternalContext.class);
    private static final String TOPIC_NAME_PREFIX = "kafka-single-topic";
    private static final int DEFAULT_TIMEOUT = 30;
    private static final int NUM_RECORDS_UPPER_BOUND = 500;
    private static final int NUM_RECORDS_LOWER_BOUND = 100;
    protected String bootstrapServers;
    private final Map<Integer, SourceSplitDataWriter<String>> partitionToSplitWriter = new HashMap();
    private int numSplits = 0;
    private final String topicName = "kafka-single-topic-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
    protected final AdminClient kafkaAdminClient = createAdminClient();

    /* loaded from: input_file:org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext$Factory.class */
    public static class Factory implements ExternalContext.Factory<String> {
        private final KafkaContainer kafkaContainer;

        public Factory(KafkaContainer kafkaContainer) {
            this.kafkaContainer = kafkaContainer;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public String getBootstrapServer() {
            return String.join(",", this.kafkaContainer.getBootstrapServers(), (String) this.kafkaContainer.getNetworkAliases().stream().map(str -> {
                return String.join(":", str, Integer.toString(9092));
            }).collect(Collectors.joining(",")));
        }

        public ExternalContext<String> createExternalContext() {
            return new KafkaSingleTopicExternalContext(getBootstrapServer());
        }
    }

    public KafkaSingleTopicExternalContext(String str) {
        this.bootstrapServers = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createTopic(String str, int i, short s) {
        LOG.debug("Creating new Kafka topic {} with {} partitions and {} replicas", new Object[]{str, Integer.valueOf(i), Short.valueOf(s)});
        try {
            this.kafkaAdminClient.createTopics(Collections.singletonList(new NewTopic(str, i, s))).all().get(30L, TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(String.format("Cannot create topic '%s'", str), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void deleteTopic(String str) {
        LOG.debug("Deleting Kafka topic {}", str);
        try {
            this.kafkaAdminClient.deleteTopics(Collections.singletonList(str)).all().get(30L, TimeUnit.SECONDS);
        } catch (Exception e) {
            if (ExceptionUtils.getRootCause(e) instanceof UnknownTopicOrPartitionException) {
                throw new RuntimeException(String.format("Cannot delete topic '%s'", str), e);
            }
        }
    }

    private AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.bootstrapServers);
        return AdminClient.create(properties);
    }

    public Source<String, ?, ?> createSource(Boundedness boundedness) {
        KafkaSourceBuilder builder = KafkaSource.builder();
        if (boundedness == Boundedness.BOUNDED) {
            builder = builder.setBounded(OffsetsInitializer.latest());
        }
        return builder.setGroupId("flink-kafka-test").setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)).setTopics(new String[]{this.topicName}).setBootstrapServers(this.bootstrapServers).build();
    }

    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
        if (this.numSplits == 0) {
            createTopic(this.topicName, 1, (short) 1);
            this.numSplits++;
        } else {
            LOG.debug("Creating new partition for topic {}", this.topicName);
            AdminClient adminClient = this.kafkaAdminClient;
            String str = this.topicName;
            int i = this.numSplits + 1;
            this.numSplits = i;
            adminClient.createPartitions(Collections.singletonMap(str, NewPartitions.increaseTo(i)));
        }
        KafkaPartitionDataWriter kafkaPartitionDataWriter = new KafkaPartitionDataWriter(getKafkaProducerProperties(this.numSplits - 1), new TopicPartition(this.topicName, this.numSplits - 1));
        this.partitionToSplitWriter.put(Integer.valueOf(this.numSplits - 1), kafkaPartitionDataWriter);
        return kafkaPartitionDataWriter;
    }

    public Collection<String> generateTestData(int i, long j) {
        Random random = new Random(j);
        ArrayList arrayList = new ArrayList();
        int nextInt = random.nextInt(400) + NUM_RECORDS_LOWER_BOUND;
        for (int i2 = 0; i2 < nextInt; i2++) {
            arrayList.add(generateRandomString(i, random.nextInt(50) + 1, random));
        }
        return arrayList;
    }

    private String generateRandomString(int i, int i2, Random random) {
        StringBuilder append = new StringBuilder().append(i).append("-");
        for (int i3 = 0; i3 < i2; i3++) {
            append.append("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".charAt(random.nextInt("ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789".length())));
        }
        return append.toString();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Properties getKafkaProducerProperties(int i) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", this.bootstrapServers);
        properties.setProperty("client.id", String.join("-", "flink-kafka-split-writer", Integer.toString(i), Long.toString(ThreadLocalRandom.current().nextLong(Long.MAX_VALUE))));
        properties.setProperty("key.serializer", ByteArraySerializer.class.getName());
        properties.setProperty("value.serializer", ByteArraySerializer.class.getName());
        return properties;
    }

    public void close() {
        deleteTopic(this.topicName);
        this.partitionToSplitWriter.forEach((num, sourceSplitDataWriter) -> {
            try {
                sourceSplitDataWriter.close();
            } catch (Exception e) {
                this.kafkaAdminClient.close();
                throw new RuntimeException("Cannot close split writer", e);
            }
        });
        this.partitionToSplitWriter.clear();
        this.kafkaAdminClient.close();
    }

    public String toString() {
        return "Single-topic Kafka";
    }
}
