package org.apache.pekko.kafka.testkit.internal;

import com.github.dockerjava.api.command.InspectContainerResponse;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.pekko.annotation.InternalApi;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;

@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/testkit/internal/PekkoConnectorsKafkaContainer.class */
public class PekkoConnectorsKafkaContainer extends GenericContainer<PekkoConnectorsKafkaContainer> {
    private static final String START_STOP_SCRIPT = "/testcontainers_start_stop_wrapper.sh";
    private static final String STARTER_SCRIPT = "/testcontainers_start.sh";
    public static final String DEFAULT_CONFLUENT_PLATFORM_VERSION = "6.1.1";
    public static final DockerImageName DEFAULT_ZOOKEEPER_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-zookeeper").withTag(DEFAULT_CONFLUENT_PLATFORM_VERSION);
    public static final DockerImageName DEFAULT_KAFKA_IMAGE_NAME = DockerImageName.parse("confluentinc/cp-kafka").withTag(DEFAULT_CONFLUENT_PLATFORM_VERSION);
    public static final int KAFKA_PORT = 9093;
    public static final int KAFKA_JMX_PORT = 49999;
    public static final int ZOOKEEPER_PORT = 2181;
    private static final int PORT_NOT_ASSIGNED = -1;
    protected String externalZookeeperConnect;
    private int brokerNum;
    private int port;
    private int jmxPort;
    private boolean useImplicitNetwork;
    private boolean enableRemoteJmxService;

    public PekkoConnectorsKafkaContainer() {
        this(DEFAULT_KAFKA_IMAGE_NAME);
    }

    public PekkoConnectorsKafkaContainer(DockerImageName dockerImageName) {
        super(dockerImageName);
        this.externalZookeeperConnect = null;
        this.brokerNum = 1;
        this.port = PORT_NOT_ASSIGNED;
        this.jmxPort = PORT_NOT_ASSIGNED;
        this.useImplicitNetwork = true;
        this.enableRemoteJmxService = false;
        super.withNetwork(Network.SHARED);
        withExposedPorts(new Integer[]{Integer.valueOf(KAFKA_PORT)});
        withBrokerNum(this.brokerNum);
        withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092");
        withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
        withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
        withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
        withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
        withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807");
        withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
    }

    /* renamed from: withNetwork, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
    public PekkoConnectorsKafkaContainer m11withNetwork(Network network) {
        this.useImplicitNetwork = false;
        return (PekkoConnectorsKafkaContainer) super.withNetwork(network);
    }

    public PekkoConnectorsKafkaContainer withBrokerNum(int i) {
        if (i == this.brokerNum) {
            return this;
        }
        this.brokerNum = i;
        return (PekkoConnectorsKafkaContainer) ((PekkoConnectorsKafkaContainer) super.withNetworkAliases(new String[]{"broker-" + this.brokerNum})).withEnv("KAFKA_BROKER_ID", "" + this.brokerNum);
    }

    public Network getNetwork() {
        if (this.useImplicitNetwork) {
            logger().warn("Deprecation warning! KafkaContainer#getNetwork without an explicitly set network. Consider using KafkaContainer#withNetwork", new Exception("Deprecated method"));
        }
        return super.getNetwork();
    }

    public int getBrokerNum() {
        return this.brokerNum;
    }

    public void stopKafka() {
        try {
            Container.ExecResult execInContainer = execInContainer(new String[]{"sh", "-c", "touch /tmp/stop"});
            if (execInContainer.getExitCode() != 0) {
                throw new Exception(execInContainer.getStderr());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void startKafka() {
        try {
            Container.ExecResult execInContainer = execInContainer(new String[]{"sh", "-c", "touch /tmp/start"});
            if (execInContainer.getExitCode() != 0) {
                throw new Exception(execInContainer.getStderr());
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public PekkoConnectorsKafkaContainer withEmbeddedZookeeper() {
        this.externalZookeeperConnect = null;
        return self();
    }

    public PekkoConnectorsKafkaContainer withExternalZookeeper(String str) {
        this.externalZookeeperConnect = str;
        return self();
    }

    public PekkoConnectorsKafkaContainer withRemoteJmxService() {
        this.enableRemoteJmxService = true;
        return self();
    }

    public String getBootstrapServers() {
        if (this.port == PORT_NOT_ASSIGNED) {
            throw new IllegalStateException("You should start Kafka container first");
        }
        return String.format("PLAINTEXT://%s:%s", getContainerIpAddress(), Integer.valueOf(this.port));
    }

    public String getJmxServiceUrl() {
        if (this.jmxPort == PORT_NOT_ASSIGNED) {
            throw new IllegalStateException("You should start Kafka container first");
        }
        return String.format("service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi", getContainerIpAddress(), Integer.valueOf(this.jmxPort));
    }

    protected void doStart() {
        withCommand(new String[]{"sh", "-c", "while [ ! -f /testcontainers_start_stop_wrapper.sh ]; do sleep 0.1; done; /testcontainers_start_stop_wrapper.sh"});
        if (this.externalZookeeperConnect == null) {
            addExposedPort(Integer.valueOf(ZOOKEEPER_PORT));
        }
        if (this.enableRemoteJmxService) {
            addExposedPort(Integer.valueOf(KAFKA_JMX_PORT));
        }
        super.doStart();
    }

    protected void containerIsStarting(InspectContainerResponse inspectContainerResponse, boolean z) {
        String str;
        try {
            super.containerIsStarting(inspectContainerResponse, z);
            this.port = getMappedPort(KAFKA_PORT).intValue();
            if (this.enableRemoteJmxService) {
                this.jmxPort = getMappedPort(KAFKA_JMX_PORT).intValue();
            }
            if (z) {
                return;
            }
            String str2 = "#!/bin/bash\n";
            if (this.externalZookeeperConnect != null) {
                str = this.externalZookeeperConnect;
            } else {
                str = "localhost:2181";
                str2 = (((str2 + "echo 'clientPort=2181' > zookeeper.properties\n") + "echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties\n") + "echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties\n") + "zookeeper-server-start zookeeper.properties &\n";
            }
            List list = (List) inspectContainerResponse.getNetworkSettings().getNetworks().values().stream().map((v0) -> {
                return v0.getIpAddress();
            }).collect(Collectors.toList());
            String str3 = (str2 + "export KAFKA_ZOOKEEPER_CONNECT='" + str + "'\n") + "export KAFKA_ADVERTISED_LISTENERS='" + ((String) Stream.concat(Stream.of(getBootstrapServers()), list.stream().map(str4 -> {
                return "BROKER://" + str4 + ":9092";
            })).collect(Collectors.joining(","))) + "'\n";
            if (this.enableRemoteJmxService) {
                str3 = (str3 + "export KAFKA_JMX_PORT='49999' \n") + "export KAFKA_JMX_HOSTNAME='" + ((String) list.stream().findFirst().orElseThrow(() -> {
                    return new IllegalStateException("Could not find IP address for JMX");
                })) + "' \n";
            }
            copyFileToContainer(Transferable.of((((str3 + ". /etc/confluent/docker/bash-config \n") + "/etc/confluent/docker/configure \n") + "/etc/confluent/docker/launch \n").getBytes(StandardCharsets.UTF_8), 511), STARTER_SCRIPT);
            copyFileToContainer(Transferable.of("#!/bin/bash\nSTARTER_SCRIPT='/testcontainers_start.sh'\ntouch /tmp/start\nwhile :; do\n\tif [ -f $STARTER_SCRIPT ]; then\n\t\tif [ -f /tmp/stop ]; then rm /tmp/stop; /usr/bin/kafka-server-stop;\n\t\telif [ -f /tmp/start ]; then rm /tmp/start; bash -c \"$STARTER_SCRIPT &\";fi\n\tfi\n\tsleep 0.1\ndone\n".getBytes(StandardCharsets.UTF_8), 511), START_STOP_SCRIPT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
