package com.github.ydespreaux.testcontainers.kafka.containers;

import com.github.dockerjava.api.model.Link;
import com.github.ydespreaux.testcontainers.common.IContainer;
import com.github.ydespreaux.testcontainers.common.checks.AbstractCommandWaitStrategy;
import com.github.ydespreaux.testcontainers.common.cmd.Command;
import com.github.ydespreaux.testcontainers.common.utils.ContainerUtils;
import com.github.ydespreaux.testcontainers.kafka.cmd.AclsAddCmd;
import com.github.ydespreaux.testcontainers.kafka.cmd.AclsOperation;
import com.github.ydespreaux.testcontainers.kafka.cmd.KafkaReadyCmd;
import com.github.ydespreaux.testcontainers.kafka.cmd.TopicCreateCommand;
import com.github.ydespreaux.testcontainers.kafka.config.TopicConfiguration;
import com.github.ydespreaux.testcontainers.kafka.security.CertificateUtils;
import com.github.ydespreaux.testcontainers.kafka.security.Certificates;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.ContainerLaunchException;
import org.testcontainers.containers.FixedHostPortGenericContainer;

/* loaded from: input_file:com/github/ydespreaux/testcontainers/kafka/containers/KafkaContainer.class */
public class KafkaContainer extends FixedHostPortGenericContainer<KafkaContainer> implements IContainer<KafkaContainer> {
    private static final String SECRETS_DIRECTORY = "/etc/kafka/secrets";
    private static final String KAFKA_DEFAULT_BASE_URL = "confluentinc/cp-kafka";
    private static final String EXTERNAL_BROKERS_SERVERS = "BROKER://localhost:%d";
    private static final String INTERNAL_BROKERS_SERVERS = "PLAINTEXT://%s:9092";
    private static final String SSL_INTERNAL_BROKERS_SERVERS = "SSL://%s:9093";
    private static final String KEYSTORE_CREDENTIALS_FILENAME = "ks_credentials";
    private static final String TRUSTSTORE_CREDENTIALS_FILENAME = "ts_credentials";
    private static final String BROKER_SERVERS_SYSTEM_PROPERTY = "spring.kafka.bootstrap-servers";
    private static final String SECURITY_PROTOCOL_SYSTEM_PROPERTY = "spring.kafka.security.protocol";
    private static final String KEY_PASSWORD_SYSTEM_PROPERTY = "spring.kafka.ssl.key-password";
    private static final String KEYSTORE_LOCATION_SYSTEM_PROPERTY = "spring.kafka.ssl.key-store-location";
    private static final String KEYSTORE_PASSWORD_SYSTEM_PROPERTY = "spring.kafka.ssl.key-store-password";
    private static final String TRUSTSTORE_LOCATION_SYSTEM_PROPERTY = "spring.kafka.ssl.trust-store-location";
    private static final String TRUSTSTORE_PASSWORD_SYSTEM_PROPERTY = "spring.kafka.ssl.trust-store-password";
    private static final String IDENTIFICATION_ALGORITHM_SYSTEM_PROPERTY = "spring.kafka.properties.ssl.endpoint.identification.algorithm";
    private final int brokersMappingPort;
    private boolean registerSpringbootProperties;
    private final List<TopicCreateCommand> topicCmds;
    private final Map<String, String> systemPropertyMap;
    private final List<AclsAddCmd> aclsCommands;
    private Certificates kafkaServerCertificates;
    private Certificates kafkaClientCertificates;
    private AclsAddCmd administratorAcls;
    private static final Logger log = LoggerFactory.getLogger(KafkaContainer.class);
    private static final Command<KafkaContainer> healthCmd = new KafkaReadyCmd(10);
    private static final Collection<String> FORMATS_VERSION = Collections.unmodifiableList(Arrays.asList("0.10.0", "0.10.1", "0.10.2", "0.11.0", "1.0.0", "1.1.0", "2.0.0"));

    public KafkaContainer(String str) {
        this(str, ContainerUtils.getAvailableMappingPort());
    }

    public KafkaContainer(String str, int i) {
        super("confluentinc/cp-kafka:" + str);
        this.registerSpringbootProperties = true;
        this.topicCmds = new ArrayList();
        this.systemPropertyMap = new HashMap<String, String>() { // from class: com.github.ydespreaux.testcontainers.kafka.containers.KafkaContainer.1
            {
                put(KafkaContainer.BROKER_SERVERS_SYSTEM_PROPERTY, KafkaContainer.BROKER_SERVERS_SYSTEM_PROPERTY);
                put(KafkaContainer.SECURITY_PROTOCOL_SYSTEM_PROPERTY, KafkaContainer.SECURITY_PROTOCOL_SYSTEM_PROPERTY);
                put(KafkaContainer.KEY_PASSWORD_SYSTEM_PROPERTY, KafkaContainer.KEY_PASSWORD_SYSTEM_PROPERTY);
                put(KafkaContainer.KEYSTORE_LOCATION_SYSTEM_PROPERTY, KafkaContainer.KEYSTORE_LOCATION_SYSTEM_PROPERTY);
                put(KafkaContainer.KEYSTORE_PASSWORD_SYSTEM_PROPERTY, KafkaContainer.KEYSTORE_PASSWORD_SYSTEM_PROPERTY);
                put(KafkaContainer.TRUSTSTORE_LOCATION_SYSTEM_PROPERTY, KafkaContainer.TRUSTSTORE_LOCATION_SYSTEM_PROPERTY);
                put(KafkaContainer.TRUSTSTORE_PASSWORD_SYSTEM_PROPERTY, KafkaContainer.TRUSTSTORE_PASSWORD_SYSTEM_PROPERTY);
                put(KafkaContainer.IDENTIFICATION_ALGORITHM_SYSTEM_PROPERTY, KafkaContainer.IDENTIFICATION_ALGORITHM_SYSTEM_PROPERTY);
            }
        };
        this.aclsCommands = new ArrayList();
        this.brokersMappingPort = i;
        withLogConsumer(ContainerUtils.containerLogsConsumer(log));
        waitingFor(new AbstractCommandWaitStrategy(this) { // from class: com.github.ydespreaux.testcontainers.kafka.containers.KafkaContainer.2
            public List<Command> getCheckCommands() {
                return KafkaContainer.this.isSecured() ? Arrays.asList(KafkaContainer.this.administratorAcls, KafkaContainer.healthCmd) : Arrays.asList(KafkaContainer.healthCmd);
            }
        }).withStartupTimeout(Duration.ofSeconds(120L));
    }

    protected void configure() {
        Objects.requireNonNull(getNetwork(), "Network attribut must not be null !!");
        if (isSecured()) {
            withEnv("KAFKA_ADVERTISED_LISTENERS", String.format(EXTERNAL_BROKERS_SERVERS, Integer.valueOf(this.brokersMappingPort)) + "," + String.format(INTERNAL_BROKERS_SERVERS, getNetworkAliases().get(0)) + "," + String.format(SSL_INTERNAL_BROKERS_SERVERS, getNetworkAliases().get(0))).withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:SSL,SSL:SSL,PLAINTEXT:PLAINTEXT").withEnv("KAFKA_SECURITY_INTER_BROKER_PROTOCOL", "SSL").withEnv("KAFKA_SSL_CLIENT_AUTH", "requested").withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.auth.SimpleAclAuthorizer").withEnv("KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM", "").withEnv("KAFKA_LOG4J_LOGGERS", "kafka.authorizer.logger=DEBUG,kafka.authorizer=DEBUG");
        } else {
            withEnv("KAFKA_ADVERTISED_LISTENERS", String.format(EXTERNAL_BROKERS_SERVERS, Integer.valueOf(this.brokersMappingPort)) + "," + String.format(INTERNAL_BROKERS_SERVERS, getNetworkAliases().get(0))).withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT").withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "PLAINTEXT");
        }
        ((KafkaContainer) withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", String.valueOf(1)).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false").withExposedPorts(new Integer[]{Integer.valueOf(this.brokersMappingPort)}).withFixedExposedPort(this.brokersMappingPort, this.brokersMappingPort)).withCreateContainerCmdModifier(createContainerCmd -> {
            createContainerCmd.withName("testcontainsers-kafka-" + UUID.randomUUID());
        });
    }

    public boolean isSecured() {
        return this.kafkaServerCertificates != null;
    }

    public String getURL() {
        return String.format(EXTERNAL_BROKERS_SERVERS, getFirstMappedPort());
    }

    public String getInternalURL() {
        return String.format(isSecured() ? SSL_INTERNAL_BROKERS_SERVERS : INTERNAL_BROKERS_SERVERS, getNetworkAliases().get(0));
    }

    public void start() {
        super.start();
        if (!CollectionUtils.isEmpty(this.topicCmds)) {
            if (log.isInfoEnabled()) {
                log.info("Start of topics creation...");
            }
            executeCommands(this.topicCmds);
            if (log.isInfoEnabled()) {
                log.info("End of topics creation");
            }
        }
        if (isSecured() && !CollectionUtils.isEmpty(this.aclsCommands)) {
            if (log.isInfoEnabled()) {
                log.info("Start of acls creation...");
            }
            executeCommands(this.aclsCommands);
            if (log.isInfoEnabled()) {
                log.info("End of acls creation");
            }
        }
        if (registerSpringbootProperties()) {
            registerKafkaEnvironment();
        }
    }

    public KafkaContainer withZookeeperPort(Integer num) {
        withEnv("KAFKA_ZOOKEEPER_CONNECT", String.format("zookeeper:%d", num));
        return this;
    }

    public KafkaContainer withFormatMessageVersion(String str) {
        if (str != null) {
            checkFormatMessageVersion(str);
            withEnv("KAFKA_INTER_BROKER_PROTOCOL_VERSION", str);
            withEnv("KAFKA_LOG_MESSAGE_FORMAT_VERSION", str);
        }
        return this;
    }

    private void checkFormatMessageVersion(String str) {
        if (!FORMATS_VERSION.contains(str)) {
            throw new IllegalArgumentException(String.format("Illegal message format version : %s", str));
        }
    }

    public KafkaContainer withZookeeperHostname(String str) {
        if (str != null) {
            withCreateContainerCmdModifier(createContainerCmd -> {
                createContainerCmd.withLinks(new Link[]{new Link(str, "zookeeper")});
            });
        }
        return this;
    }

    public KafkaContainer withBrokerServersSystemProperty(String str) {
        this.systemPropertyMap.put(BROKER_SERVERS_SYSTEM_PROPERTY, str);
        return this;
    }

    public KafkaContainer withSecurityProtocolSystemProperty(String str) {
        this.systemPropertyMap.put(SECURITY_PROTOCOL_SYSTEM_PROPERTY, str);
        return this;
    }

    public KafkaContainer withKeyPasswordSystemProperty(String str) {
        this.systemPropertyMap.put(KEY_PASSWORD_SYSTEM_PROPERTY, str);
        return this;
    }

    public KafkaContainer withKeystoreLocationSystemProperty(String str) {
        this.systemPropertyMap.put(KEYSTORE_LOCATION_SYSTEM_PROPERTY, str);
        return this;
    }

    public KafkaContainer withKeystorePasswordSystemProperty(String str) {
        this.systemPropertyMap.put(KEYSTORE_PASSWORD_SYSTEM_PROPERTY, str);
        return this;
    }

    public KafkaContainer withTruststoreLocationSystemProperty(String str) {
        this.systemPropertyMap.put(TRUSTSTORE_LOCATION_SYSTEM_PROPERTY, str);
        return this;
    }

    public KafkaContainer withTruststorePasswordSystemProperty(String str) {
        this.systemPropertyMap.put(TRUSTSTORE_PASSWORD_SYSTEM_PROPERTY, str);
        return this;
    }

    public KafkaContainer withIdentificationAlgorithmSystemProperty(String str) {
        this.systemPropertyMap.put(IDENTIFICATION_ALGORITHM_SYSTEM_PROPERTY, str);
        return this;
    }

    /* renamed from: withRegisterSpringbootProperties, reason: merged with bridge method [inline-methods] */
    public KafkaContainer m5withRegisterSpringbootProperties(boolean z) {
        this.registerSpringbootProperties = z;
        return this;
    }

    public boolean registerSpringbootProperties() {
        return this.registerSpringbootProperties;
    }

    @Deprecated
    public String getLocalURL() {
        return getInternalURL();
    }

    protected void registerKafkaEnvironment() {
        System.setProperty(this.systemPropertyMap.get(BROKER_SERVERS_SYSTEM_PROPERTY), getURL());
        if (isSecured()) {
            if (this.kafkaClientCertificates == null) {
                if (log.isWarnEnabled()) {
                    log.warn("SSL properties not set in system properties. The client certificates is not defined.");
                    return;
                }
                return;
            }
            System.setProperty(this.systemPropertyMap.get(SECURITY_PROTOCOL_SYSTEM_PROPERTY), "SSL");
            System.setProperty(this.systemPropertyMap.get(KEY_PASSWORD_SYSTEM_PROPERTY), this.kafkaClientCertificates.getKeystorePassword());
            System.setProperty(this.systemPropertyMap.get(KEYSTORE_LOCATION_SYSTEM_PROPERTY), "file:" + this.kafkaClientCertificates.getKeystorePath());
            System.setProperty(this.systemPropertyMap.get(KEYSTORE_PASSWORD_SYSTEM_PROPERTY), this.kafkaClientCertificates.getKeystorePassword());
            if (this.kafkaClientCertificates.getTruststorePath() != null) {
                System.setProperty(this.systemPropertyMap.get(TRUSTSTORE_LOCATION_SYSTEM_PROPERTY), "file:" + this.kafkaClientCertificates.getTruststorePath());
                System.setProperty(this.systemPropertyMap.get(TRUSTSTORE_PASSWORD_SYSTEM_PROPERTY), this.kafkaClientCertificates.getTruststorePassword());
            }
            System.setProperty(this.systemPropertyMap.get(IDENTIFICATION_ALGORITHM_SYSTEM_PROPERTY), "");
        }
    }

    public KafkaContainer withKafkaServerCertificates(Certificates certificates) {
        if (certificates == null) {
            return this;
        }
        if (this.kafkaServerCertificates != null) {
            throw new IllegalArgumentException("Certificates is already initialized.");
        }
        this.kafkaServerCertificates = certificates;
        addFileSystemBind(certificates.getKeystorePath().toString(), "/etc/kafka/secrets/" + certificates.getKeystorePath().getFileName(), BindMode.READ_ONLY);
        try {
            addFileSystemBind(CertificateUtils.generateCredentials(certificates.getKeystorePassword()).toString(), "/etc/kafka/secrets/ks_credentials", BindMode.READ_ONLY);
            if (certificates.getTruststorePath() != null) {
                addFileSystemBind(certificates.getTruststorePath().toString(), "/etc/kafka/secrets/" + certificates.getTruststorePath().getFileName(), BindMode.READ_ONLY);
                try {
                    addFileSystemBind(CertificateUtils.generateCredentials(certificates.getTruststorePassword()).toString(), "/etc/kafka/secrets/ts_credentials", BindMode.READ_ONLY);
                } catch (IOException e) {
                    throw new ContainerLaunchException("Generating credentials failed: ", e);
                }
            }
            this.administratorAcls = new AclsAddCmd(certificates).operation(AclsOperation.ALL).topic("*").group("*").cluster("kafka-cluster");
            withEnv("KAFKA_SECURITY_PROTOCOL", "SSL");
            withEnv("KAFKA_SSL_KEYSTORE_FILENAME", certificates.getKeystorePath().getFileName().toString());
            withEnv("KAFKA_SSL_KEYSTORE_CREDENTIALS", KEYSTORE_CREDENTIALS_FILENAME);
            withEnv("KAFKA_SSL_KEY_CREDENTIALS", KEYSTORE_CREDENTIALS_FILENAME);
            if (certificates.getTruststorePath() != null) {
                withEnv("KAFKA_SSL_TRUSTSTORE_FILENAME", certificates.getTruststorePath().getFileName().toString());
                withEnv("KAFKA_SSL_TRUSTSTORE_CREDENTIALS", TRUSTSTORE_CREDENTIALS_FILENAME);
            }
            return this;
        } catch (IOException e2) {
            throw new ContainerLaunchException("Generating credentials failed: ", e2);
        }
    }

    public KafkaContainer withKafkaClientCertificates(Certificates certificates) {
        this.kafkaClientCertificates = certificates;
        return this;
    }

    public KafkaContainer withTopics(List<TopicConfiguration> list) {
        if (CollectionUtils.isEmpty(list)) {
            return this;
        }
        List<? extends Command> list2 = (List) list.stream().map(TopicCreateCommand::new).collect(Collectors.toList());
        if (isRunning()) {
            executeCommands(list2);
        } else {
            this.topicCmds.addAll(list2);
        }
        return this;
    }

    public KafkaContainer withAcls(List<AclsAddCmd> list) {
        if (isRunning()) {
            executeCommands(list);
        } else {
            this.aclsCommands.addAll(list);
        }
        return self();
    }

    private void executeCommands(List<? extends Command> list) {
        if (CollectionUtils.isEmpty(list)) {
            return;
        }
        list.forEach(command -> {
            command.execute(this);
            if (log.isInfoEnabled()) {
                log.info("Command executed : {}", command.toString());
            }
        });
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        return (obj instanceof KafkaContainer) && super.equals(obj) && this.brokersMappingPort == ((KafkaContainer) obj).brokersMappingPort;
    }

    public int hashCode() {
        return Objects.hash(Integer.valueOf(super.hashCode()), Integer.valueOf(this.brokersMappingPort));
    }

    public Certificates getKafkaServerCertificates() {
        return this.kafkaServerCertificates;
    }

    public Certificates getKafkaClientCertificates() {
        return this.kafkaClientCertificates;
    }
}
