package org.apache.flink.connector.kafka.testutils;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext.class */
public class DynamicKafkaSourceExternalContext implements DataStreamSourceExternalContext<String> {
    private static final int NUM_TEST_RECORDS_PER_SPLIT = 10;
    private static final int NUM_PARTITIONS = 1;
    private final List<URL> connectorJarPaths;
    private final Map<String, Properties> clusterPropertiesMap;
    private final long randomTopicSuffix;
    private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaSourceExternalContext.class);
    private static final Pattern STREAM_ID_PATTERN = Pattern.compile("stream-[0-9]+");
    private final Set<KafkaStream> kafkaStreams = new HashSet();
    private final List<SplitDataWriter> splitDataWriters = new ArrayList();

    /* loaded from: input_file:org/apache/flink/connector/kafka/testutils/DynamicKafkaSourceExternalContext$SplitDataWriter.class */
    private static class SplitDataWriter implements ExternalSystemSplitDataWriter<String> {
        private final Map<String, Properties> clusterPropertiesMap;
        private final List<Tuple2<String, String>> clusterTopics;

        public SplitDataWriter(Map<String, Properties> map, List<Tuple2<String, String>> list) {
            this.clusterPropertiesMap = map;
            this.clusterTopics = list;
        }

        public void writeRecords(List<String> list) {
            int i = 0;
            try {
                for (Tuple2<String, String> tuple2 : this.clusterTopics) {
                    String str = (String) tuple2.f0;
                    String str2 = (String) tuple2.f1;
                    ArrayList arrayList = new ArrayList();
                    for (int i2 = 0; i2 < DynamicKafkaSourceExternalContext.NUM_PARTITIONS; i2 += DynamicKafkaSourceExternalContext.NUM_PARTITIONS) {
                        for (int i3 = 0; i3 < 10 && list.size() > i; i3 += DynamicKafkaSourceExternalContext.NUM_PARTITIONS) {
                            Integer valueOf = Integer.valueOf(i2);
                            int i4 = i;
                            i += DynamicKafkaSourceExternalContext.NUM_PARTITIONS;
                            arrayList.add(new ProducerRecord(str2, valueOf, (Object) null, list.get(i4)));
                        }
                    }
                    DynamicKafkaSourceExternalContext.logger.info("Writing producer records: {}", arrayList);
                    DynamicKafkaSourceTestHelper.produceToKafka(this.clusterPropertiesMap.get(str), arrayList, StringSerializer.class, StringSerializer.class);
                }
            } catch (Throwable th) {
                throw new RuntimeException("Failed to produce test data", th);
            }
        }

        public void close() throws Exception {
        }

        public List<Tuple2<String, String>> getClusterTopics() {
            return this.clusterTopics;
        }
    }

    public DynamicKafkaSourceExternalContext(List<String> list, List<URL> list2) {
        this.connectorJarPaths = list2;
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", list.get(0));
        Properties properties2 = new Properties();
        properties2.setProperty("bootstrap.servers", list.get(NUM_PARTITIONS));
        this.clusterPropertiesMap = ImmutableMap.of("cluster0", properties, "cluster1", properties2);
        this.randomTopicSuffix = ThreadLocalRandom.current().nextLong(0L, Long.MAX_VALUE);
    }

    public Source<String, ?, ?> createSource(TestingSourceSettings testingSourceSettings) throws UnsupportedOperationException {
        DynamicKafkaSourceBuilder builder = DynamicKafkaSource.builder();
        builder.setStreamPattern(STREAM_ID_PATTERN).setKafkaMetadataService(new MockKafkaMetadataService(this.kafkaStreams)).setGroupId("DynamicKafkaSourceExternalContext").setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
        if (testingSourceSettings.getBoundedness().equals(Boundedness.BOUNDED)) {
            builder.setBounded(OffsetsInitializer.latest());
        }
        return builder.build();
    }

    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(TestingSourceSettings testingSourceSettings) {
        SplitDataWriter splitDataWriter = new SplitDataWriter(this.clusterPropertiesMap, setupSplits(String.valueOf(this.splitDataWriters.size())));
        this.splitDataWriters.add(splitDataWriter);
        return splitDataWriter;
    }

    private List<Tuple2<String, String>> setupSplits(String str) {
        KafkaStream kafkaStream = getKafkaStream(str + this.randomTopicSuffix);
        logger.info("Setting up splits for {}", kafkaStream);
        List<Tuple2<String, String>> list = (List) kafkaStream.getClusterMetadataMap().entrySet().stream().flatMap(entry -> {
            return ((ClusterMetadata) entry.getValue()).getTopics().stream().map(str2 -> {
                return Tuple2.of(entry.getKey(), str2);
            });
        }).collect(Collectors.toList());
        for (Tuple2<String, String> tuple2 : list) {
            KafkaTestEnvironmentImpl.createNewTopic((String) tuple2.f1, NUM_PARTITIONS, NUM_PARTITIONS, this.clusterPropertiesMap.get((String) tuple2.f0));
        }
        this.kafkaStreams.add(kafkaStream);
        return list;
    }

    private KafkaStream getKafkaStream(String str) {
        return new KafkaStream("stream-" + str, ImmutableMap.of("cluster0", new ClusterMetadata(ImmutableSet.of("topic0-" + str, "topic1-" + str), this.clusterPropertiesMap.get("cluster0")), "cluster1", new ClusterMetadata(ImmutableSet.of("topic2-" + str, "topic3-" + str), this.clusterPropertiesMap.get("cluster1"))));
    }

    public List<String> generateTestData(TestingSourceSettings testingSourceSettings, int i, long j) {
        return (List) IntStream.range(0, 10).boxed().map(num -> {
            return Integer.toString(num.intValue());
        }).collect(Collectors.toList());
    }

    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }

    public List<URL> getConnectorJarPaths() {
        return this.connectorJarPaths;
    }

    public void close() throws Exception {
        HashMap hashMap = new HashMap();
        Iterator<SplitDataWriter> it = this.splitDataWriters.iterator();
        while (it.hasNext()) {
            for (Tuple2<String, String> tuple2 : it.next().getClusterTopics()) {
                ((List) hashMap.computeIfAbsent(tuple2.f0, str -> {
                    return new ArrayList();
                })).add(tuple2.f1);
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            String str2 = (String) entry.getKey();
            List list = (List) entry.getValue();
            AdminClient create = AdminClient.create(this.clusterPropertiesMap.get(str2));
            Throwable th = null;
            try {
                try {
                    create.deleteTopics(list).all().get();
                    CommonTestUtils.waitUtil(() -> {
                        try {
                            Stream map = ((Collection) create.listTopics().listings().get()).stream().map((v0) -> {
                                return v0.name();
                            });
                            list.getClass();
                            return Boolean.valueOf(map.noneMatch((v1) -> {
                                return r1.contains(v1);
                            }));
                        } catch (Exception e) {
                            logger.warn("Exception caught when listing Kafka topics", e);
                            return false;
                        }
                    }, Duration.ofSeconds(30L), String.format("Topics %s were not deleted within timeout", list));
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    logger.info("topics {} are deleted from {}", list, str2);
                } catch (Throwable th3) {
                    if (create != null) {
                        if (th != null) {
                            try {
                                create.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            create.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
    }
}
