package org.apache.beam.it.kafka;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import javax.annotation.Nullable;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.testcontainers.TestContainerResourceManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
import org.checkerframework.dataflow.qual.Pure;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:org/apache/beam/it/kafka/KafkaResourceManager.class */
public class KafkaResourceManager extends TestContainerResourceManager<GenericContainer<?>> implements ResourceManager {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaResourceManager.class);
    private static final String DEFAULT_KAFKA_CONTAINER_NAME = "confluentinc/cp-kafka";
    private static final String DEFAULT_KAFKA_CONTAINER_TAG = "7.3.1";
    private static final int KAFKA_BROKER_PORT = 9093;
    private final AdminClient kafkaClient;
    private final Set<String> topicNames;
    private final String connectionString;
    private final boolean usingStaticTopic;

    /* loaded from: input_file:org/apache/beam/it/kafka/KafkaResourceManager$Builder.class */
    public static final class Builder extends TestContainerResourceManager.Builder<KafkaResourceManager> {
        private final Set<String> topicNames;
        int numTopics;

        private Builder(String str) {
            super(str, KafkaResourceManager.DEFAULT_KAFKA_CONTAINER_NAME, KafkaResourceManager.DEFAULT_KAFKA_CONTAINER_TAG);
            this.topicNames = new HashSet();
            this.numTopics = 0;
        }

        public Builder setTopicNames(Set<String> set) {
            this.topicNames.clear();
            this.topicNames.addAll(set);
            return this;
        }

        public Builder setNumTopics(int i) {
            Preconditions.checkArgument(i >= 0, "numTopics must be non-negative.");
            Preconditions.checkArgument(this.topicNames.size() == 0, "setTopicNames and setNumTopics cannot be set at the same time.");
            this.numTopics = i;
            return this;
        }

        /* renamed from: build, reason: merged with bridge method [inline-methods] */
        public KafkaResourceManager m1build() {
            return new KafkaResourceManager(this);
        }
    }

    /* loaded from: input_file:org/apache/beam/it/kafka/KafkaResourceManager$DefaultKafkaContainer.class */
    static class DefaultKafkaContainer extends KafkaContainer {

        @Nullable
        private final String host;

        public DefaultKafkaContainer(Builder builder) {
            super(DockerImageName.parse(builder.containerImageName).withTag(builder.containerImageTag));
            this.host = builder.host;
        }

        public String getHost() {
            return this.host == null ? super.getHost() : this.host;
        }

        @EnsuresNonNullIf(expression = {"#1"}, result = true)
        @Pure
        public boolean equals(Object obj) {
            return super.equals(obj);
        }

        @Pure
        public int hashCode() {
            return super.hashCode();
        }
    }

    private KafkaResourceManager(Builder builder) {
        this(null, new DefaultKafkaContainer(builder), builder);
    }

    @VisibleForTesting
    KafkaResourceManager(@Nullable AdminClient adminClient, KafkaContainer kafkaContainer, Builder builder) {
        super(kafkaContainer, builder);
        this.usingStaticTopic = builder.topicNames.size() > 0;
        if (this.usingStaticTopic) {
            this.topicNames = new HashSet(builder.topicNames);
        } else {
            HashSet hashSet = new HashSet();
            for (int i = 0; i < builder.numTopics; i++) {
                hashSet.add(KafkaResourceManagerUtils.generateTopicName(String.format("%s-%d", builder.testId, Integer.valueOf(i))));
            }
            this.topicNames = new HashSet(hashSet);
        }
        this.connectionString = String.format("PLAINTEXT://%s:%d", getHost(), Integer.valueOf(getPort(KAFKA_BROKER_PORT)));
        this.kafkaClient = adminClient != null ? adminClient : AdminClient.create(ImmutableMap.of("bootstrap.servers", this.connectionString));
    }

    public static Builder builder(String str) {
        return new Builder(str);
    }

    public synchronized String getBootstrapServers() {
        return this.connectionString;
    }

    public synchronized Set<String> getTopicNames() {
        return this.topicNames;
    }

    public synchronized String createTopic(String str, int i) throws KafkaResourceManagerException {
        Preconditions.checkArgument(i > 0, "partitions must be positive.");
        String generateTopicName = KafkaResourceManagerUtils.generateTopicName(str);
        try {
            if (!((Set) this.kafkaClient.listTopics().names().get()).contains(generateTopicName)) {
                this.kafkaClient.createTopics(Collections.singletonList(new NewTopic(generateTopicName, i, (short) 1))).all().get();
                this.topicNames.add(generateTopicName);
            }
            LOG.info("Successfully created topic {}.", generateTopicName);
            return generateTopicName;
        } catch (Exception e) {
            throw new KafkaResourceManagerException("Error creating topics.", e);
        }
    }

    public synchronized <K, V> KafkaProducer<K, V> buildProducer(Serializer<K> serializer, Serializer<V> serializer2) {
        return new KafkaProducer<>(ImmutableMap.of("bootstrap.servers", getBootstrapServers()), serializer, serializer2);
    }

    public synchronized <K, V> KafkaConsumer<K, V> buildConsumer(Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return new KafkaConsumer<>(ImmutableMap.of("bootstrap.servers", getBootstrapServers().replace("PLAINTEXT://", ""), "group.id", "cg-" + UUID.randomUUID(), "auto.offset.reset", "earliest"), deserializer, deserializer2);
    }

    public synchronized void cleanupAll() throws KafkaResourceManagerException {
        LOG.info("Attempting to cleanup Kafka manager.");
        boolean z = false;
        try {
            if (!this.usingStaticTopic) {
                this.kafkaClient.deleteTopics(this.topicNames).all().get();
            }
        } catch (Exception e) {
            LOG.error("Failed to delete kafka topic.", e);
            z = true;
        }
        if (z) {
            throw new KafkaResourceManagerException("Failed to delete resources. Check above for errors.");
        }
        super.cleanupAll();
        LOG.info("Kafka manager successfully cleaned up.");
    }
}
