package io.quarkus.kafka.client.deployment;

import com.github.dockerjava.api.command.InspectContainerResponse;
import io.quarkus.bootstrap.classloading.QuarkusClassLoader;
import io.quarkus.deployment.IsDockerWorking;
import io.quarkus.deployment.IsNormal;
import io.quarkus.deployment.annotations.BuildProducer;
import io.quarkus.deployment.annotations.BuildStep;
import io.quarkus.deployment.builditem.DevServicesNativeConfigResultBuildItem;
import io.quarkus.deployment.builditem.LaunchModeBuildItem;
import io.quarkus.deployment.builditem.RunTimeConfigurationDefaultBuildItem;
import io.quarkus.deployment.builditem.ServiceStartBuildItem;
import io.quarkus.runtime.LaunchMode;
import io.quarkus.runtime.configuration.ConfigUtils;
import java.io.Closeable;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.jboss.logging.Logger;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor.class */
public class DevServicesKafkaProcessor {
    private static final int KAFKA_PORT = 9092;
    private static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
    static volatile Closeable closeable;
    static volatile KafkaDevServiceCfg cfg;
    private final IsDockerWorking isDockerWorking = new IsDockerWorking(true);
    private static final Logger log = Logger.getLogger(DevServicesKafkaProcessor.class);
    static volatile boolean first = true;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor$KafkaBroker.class */
    public static class KafkaBroker {
        private final String url;
        private final Closeable closeable;

        public KafkaBroker(String str, Closeable closeable) {
            this.url = str;
            this.closeable = closeable;
        }

        public String getBootstrapServers() {
            return this.url;
        }

        public Closeable getCloseable() {
            return this.closeable;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor$KafkaDevServiceCfg.class */
    public static final class KafkaDevServiceCfg {
        private final boolean devServicesEnabled;
        private final String imageName;
        private final Integer fixedExposedPort;

        public KafkaDevServiceCfg(boolean z, String str, Integer num) {
            this.devServicesEnabled = z;
            this.imageName = str;
            this.fixedExposedPort = num;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            KafkaDevServiceCfg kafkaDevServiceCfg = (KafkaDevServiceCfg) obj;
            return this.devServicesEnabled == kafkaDevServiceCfg.devServicesEnabled && Objects.equals(this.imageName, kafkaDevServiceCfg.imageName) && Objects.equals(this.fixedExposedPort, kafkaDevServiceCfg.fixedExposedPort);
        }

        public int hashCode() {
            return Objects.hash(Boolean.valueOf(this.devServicesEnabled), this.imageName, this.fixedExposedPort);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/quarkus/kafka/client/deployment/DevServicesKafkaProcessor$RedPandaKafkaContainer.class */
    public static final class RedPandaKafkaContainer extends GenericContainer<RedPandaKafkaContainer> {
        private final int port;
        private static final String STARTER_SCRIPT = "/redpanda.sh";

        private RedPandaKafkaContainer(DockerImageName dockerImageName, int i) {
            super(dockerImageName);
            this.port = i;
            withNetwork(Network.SHARED);
            withExposedPorts(new Integer[]{Integer.valueOf(DevServicesKafkaProcessor.KAFKA_PORT)});
            if (!dockerImageName.getRepository().equals("vectorized/redpanda")) {
                throw new IllegalArgumentException("Only vectorized/redpanda images are supported");
            }
            withCreateContainerCmdModifier(createContainerCmd -> {
                createContainerCmd.withEntrypoint(new String[]{"sh"});
            });
            withCommand(new String[]{"-c", "while [ ! -f /redpanda.sh ]; do sleep 0.1; done; /redpanda.sh"});
            waitingFor(Wait.forLogMessage(".*Started Kafka API server.*", 1));
        }

        protected void containerIsStarting(InspectContainerResponse inspectContainerResponse, boolean z) {
            super.containerIsStarting(inspectContainerResponse, z);
            copyFileToContainer(Transferable.of(((("#!/bin/bash\n" + "/usr/bin/rpk redpanda start --check=false --node-id 0 ") + "--kafka-addr PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 ") + "--advertise-kafka-addr PLAINTEXT://kafka:29092,OUTSIDE://" + getHost() + ":" + getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT)).getBytes(StandardCharsets.UTF_8), 511), STARTER_SCRIPT);
        }

        protected void configure() {
            super.configure();
            if (this.port > 0) {
                addFixedExposedPort(this.port, DevServicesKafkaProcessor.KAFKA_PORT);
            }
        }

        public String getBootstrapServers() {
            return String.format("PLAINTEXT://%s:%s", getContainerIpAddress(), getMappedPort(DevServicesKafkaProcessor.KAFKA_PORT));
        }
    }

    @BuildStep(onlyIfNot = {IsNormal.class})
    public DevServicesKafkaBrokerBuildItem startKafkaDevService(LaunchModeBuildItem launchModeBuildItem, KafkaBuildTimeConfig kafkaBuildTimeConfig, BuildProducer<RunTimeConfigurationDefaultBuildItem> buildProducer, BuildProducer<DevServicesNativeConfigResultBuildItem> buildProducer2, BuildProducer<ServiceStartBuildItem> buildProducer3) {
        KafkaDevServiceCfg configuration = getConfiguration(kafkaBuildTimeConfig);
        if (closeable != null) {
            boolean z = launchModeBuildItem.getLaunchMode() == LaunchMode.TEST;
            if (!z) {
                z = !configuration.equals(cfg);
            }
            if (!z) {
                return null;
            }
            shutdownBroker();
            cfg = null;
        }
        KafkaBroker startKafka = startKafka(configuration);
        DevServicesKafkaBrokerBuildItem devServicesKafkaBrokerBuildItem = null;
        if (startKafka != null) {
            closeable = startKafka.getCloseable();
            buildProducer.produce(new RunTimeConfigurationDefaultBuildItem(KAFKA_BOOTSTRAP_SERVERS, startKafka.getBootstrapServers()));
            devServicesKafkaBrokerBuildItem = new DevServicesKafkaBrokerBuildItem(startKafka.getBootstrapServers());
        }
        if (first) {
            first = false;
            Runnable runnable = new Runnable() { // from class: io.quarkus.kafka.client.deployment.DevServicesKafkaProcessor.1
                @Override // java.lang.Runnable
                public void run() {
                    if (DevServicesKafkaProcessor.closeable != null) {
                        DevServicesKafkaProcessor.this.shutdownBroker();
                    }
                    DevServicesKafkaProcessor.first = true;
                    DevServicesKafkaProcessor.closeable = null;
                    DevServicesKafkaProcessor.cfg = null;
                }
            };
            QuarkusClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            contextClassLoader.parent().addCloseTask(runnable);
            final Thread thread = new Thread(runnable, "Kafka container shutdown thread");
            Runtime.getRuntime().addShutdownHook(thread);
            contextClassLoader.parent().addCloseTask(new Runnable() { // from class: io.quarkus.kafka.client.deployment.DevServicesKafkaProcessor.2
                @Override // java.lang.Runnable
                public void run() {
                    Runtime.getRuntime().removeShutdownHook(thread);
                }
            });
        }
        cfg = configuration;
        if (devServicesKafkaBrokerBuildItem != null) {
            log.infof("Dev Services for Kafka started. Start applications that need to use the same Kafka broker using -Dkafka.bootstrap.servers=%s", devServicesKafkaBrokerBuildItem.getBootstrapServers());
            buildProducer2.produce(new DevServicesNativeConfigResultBuildItem(KAFKA_BOOTSTRAP_SERVERS, devServicesKafkaBrokerBuildItem.getBootstrapServers()));
        }
        return devServicesKafkaBrokerBuildItem;
    }

    private void shutdownBroker() {
        try {
            if (closeable != null) {
                try {
                    closeable.close();
                    closeable = null;
                } catch (Throwable th) {
                    log.error("Failed to stop the Kafka broker", th);
                    closeable = null;
                }
            }
        } catch (Throwable th2) {
            closeable = null;
            throw th2;
        }
    }

    private KafkaBroker startKafka(KafkaDevServiceCfg kafkaDevServiceCfg) {
        if (!kafkaDevServiceCfg.devServicesEnabled) {
            log.debug("Not starting dev services for Kafka, as it has been disabled in the config.");
            return null;
        }
        if (ConfigUtils.isPropertyPresent(KAFKA_BOOTSTRAP_SERVERS)) {
            log.debug("Not starting dev services for Kafka, the kafka.bootstrap.servers is configured.");
            return null;
        }
        if (!hasKafkaChannelWithoutBootstrapServers()) {
            log.debug("Not starting dev services for Kafka, all the channels are configured.");
            return null;
        }
        if (!this.isDockerWorking.getAsBoolean()) {
            log.warn("Docker isn't working, please configure the Kafka bootstrap servers property (kafka.bootstrap.servers).");
            return null;
        }
        final RedPandaKafkaContainer redPandaKafkaContainer = new RedPandaKafkaContainer(DockerImageName.parse(kafkaDevServiceCfg.imageName), kafkaDevServiceCfg.fixedExposedPort.intValue());
        redPandaKafkaContainer.start();
        return new KafkaBroker(redPandaKafkaContainer.getBootstrapServers(), new Closeable() { // from class: io.quarkus.kafka.client.deployment.DevServicesKafkaProcessor.3
            @Override // java.io.Closeable, java.lang.AutoCloseable
            public void close() {
                redPandaKafkaContainer.close();
            }
        });
    }

    private boolean hasKafkaChannelWithoutBootstrapServers() {
        Config config = ConfigProvider.getConfig();
        for (String str : config.getPropertyNames()) {
            boolean startsWith = str.startsWith("mp.messaging.incoming.");
            boolean startsWith2 = str.startsWith("mp.messaging.outgoing.");
            boolean z = str.endsWith(".connector") && "smallrye-kafka".equals(config.getOptionalValue(str, String.class).orElse("ignored"));
            boolean z2 = false;
            if ((startsWith || startsWith2) && z) {
                z2 = ConfigUtils.isPropertyPresent(str.replace(".connector", ".bootstrap.servers"));
            }
            if (!z2) {
                return true;
            }
        }
        return false;
    }

    private KafkaDevServiceCfg getConfiguration(KafkaBuildTimeConfig kafkaBuildTimeConfig) {
        KafkaDevServicesBuildTimeConfig kafkaDevServicesBuildTimeConfig = kafkaBuildTimeConfig.devservices;
        return new KafkaDevServiceCfg(kafkaDevServicesBuildTimeConfig.enabled.orElse(true).booleanValue(), kafkaDevServicesBuildTimeConfig.imageName, kafkaDevServicesBuildTimeConfig.port.orElse(0));
    }
}
