package org.apache.pekko.kafka.testkit.internal;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.pekko.annotation.InternalApi;
import org.rnorth.ducttape.unreliables.Unreliables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/testkit/internal/KafkaContainerCluster.class */
public class KafkaContainerCluster implements Startable {
    private static final String LOGGING_NAMESPACE_PREFIX = "pekko.kafka.testkit.testcontainers.logs";
    private static final String READINESS_CHECK_SCRIPT = "/testcontainers_readiness_check.sh";
    private static final String READINESS_CHECK_TOPIC = "ready-kafka-container-cluster";
    private final Logger log;
    private final Version kafkaImageTag;
    private final int brokersNum;
    private final Boolean useSchemaRegistry;
    private final Boolean containerLogging;
    private final Duration clusterStartTimeout;
    private final Duration readinessCheckTimeout;
    private final Network network;
    private final GenericContainer zookeeper;
    private final Collection<PekkoConnectorsKafkaContainer> brokers;
    private DockerImageName schemaRegistryImage;
    private Optional<SchemaRegistryContainer> schemaRegistry;
    public static final DockerImageName DEFAULT_ZOOKEEPER_IMAGE_NAME = PekkoConnectorsKafkaContainer.DEFAULT_ZOOKEEPER_IMAGE_NAME;
    public static final DockerImageName DEFAULT_KAFKA_IMAGE_NAME = PekkoConnectorsKafkaContainer.DEFAULT_KAFKA_IMAGE_NAME;
    public static final DockerImageName DEFAULT_SCHEMA_REGISTRY_IMAGE_NAME = SchemaRegistryContainer.DEFAULT_SCHEMA_REGISTRY_IMAGE_NAME;
    public static final Duration DEFAULT_CLUSTER_START_TIMEOUT = Duration.ofSeconds(360);
    public static final Duration DEFAULT_READINESS_CHECK_TIMEOUT = DEFAULT_CLUSTER_START_TIMEOUT;
    private static final Version BOOTSTRAP_PARAM_MIN_VERSION = new Version("5.2.0");

    public KafkaContainerCluster(int i, int i2) {
        this(DEFAULT_ZOOKEEPER_IMAGE_NAME, DEFAULT_KAFKA_IMAGE_NAME, DEFAULT_SCHEMA_REGISTRY_IMAGE_NAME, i, i2, false, false, DEFAULT_CLUSTER_START_TIMEOUT, DEFAULT_READINESS_CHECK_TIMEOUT);
    }

    public KafkaContainerCluster(DockerImageName dockerImageName, DockerImageName dockerImageName2, DockerImageName dockerImageName3, int i, int i2, boolean z, boolean z2, Duration duration, Duration duration2) {
        this.log = LoggerFactory.getLogger(getClass());
        this.schemaRegistry = Optional.empty();
        if (i < 0) {
            throw new IllegalArgumentException("brokersNum '" + i + "' must be greater than 0");
        }
        if (i2 < 0 || i2 > i) {
            throw new IllegalArgumentException("internalTopicsRf '" + i2 + "' must be less than brokersNum and greater than 0");
        }
        this.kafkaImageTag = new Version(dockerImageName2.getVersionPart());
        this.brokersNum = i;
        this.useSchemaRegistry = Boolean.valueOf(z);
        this.containerLogging = Boolean.valueOf(z2);
        this.clusterStartTimeout = duration;
        this.readinessCheckTimeout = duration2;
        this.network = Network.newNetwork();
        this.schemaRegistryImage = dockerImageName3;
        this.zookeeper = new GenericContainer(dockerImageName).withNetwork(this.network).withNetworkAliases(new String[]{"zookeeper"}).withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(PekkoConnectorsKafkaContainer.ZOOKEEPER_PORT));
        this.brokers = (Collection) IntStream.range(0, this.brokersNum).mapToObj(i3 -> {
            return (PekkoConnectorsKafkaContainer) ((PekkoConnectorsKafkaContainer) ((PekkoConnectorsKafkaContainer) ((PekkoConnectorsKafkaContainer) ((PekkoConnectorsKafkaContainer) ((PekkoConnectorsKafkaContainer) new PekkoConnectorsKafkaContainer(dockerImageName2).m11withNetwork(this.network).withBrokerNum(i3).withRemoteJmxService().dependsOn(new Startable[]{this.zookeeper})).withExternalZookeeper("zookeeper:2181").withEnv("KAFKA_BROKER_ID", i3 + "")).withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", i2 + "")).withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", i2 + "")).withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", i2 + "")).withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", i2 + "");
        }).collect(Collectors.toList());
        if (z) {
            this.schemaRegistry = Optional.of(((SchemaRegistryContainer) new SchemaRegistryContainer(this.schemaRegistryImage).withNetworkAliases(new String[]{"schema-registry"})).withCluster(this));
        } else {
            this.schemaRegistry = Optional.empty();
        }
    }

    public Network getNetwork() {
        return this.network;
    }

    public GenericContainer getZooKeeper() {
        return this.zookeeper;
    }

    public Optional<SchemaRegistryContainer> getSchemaRegistry() {
        return this.schemaRegistry;
    }

    public Collection<PekkoConnectorsKafkaContainer> getBrokers() {
        return this.brokers;
    }

    public String getBootstrapServers() {
        return (String) this.brokers.stream().map((v0) -> {
            return v0.getBootstrapServers();
        }).collect(Collectors.joining(","));
    }

    public String getInternalNetworkBootstrapServers() {
        return (String) IntStream.range(0, this.brokersNum).mapToObj(i -> {
            return String.format("broker-%s:%s", Integer.valueOf(i), "9092");
        }).collect(Collectors.joining(","));
    }

    private <T> Stream<T> optionalStream(Optional<T> optional) {
        return optional.isPresent() ? Stream.of(optional.get()) : Stream.empty();
    }

    private Stream<GenericContainer> allContainers() {
        return Stream.concat(Stream.concat(this.brokers.stream(), Stream.of(this.zookeeper)), optionalStream(this.schemaRegistry));
    }

    public void start() {
        try {
            configureContainerLogging();
            Stream<PekkoConnectorsKafkaContainer> stream = this.brokers.stream();
            Class<Startable> cls = Startable.class;
            Startable.class.getClass();
            Startables.deepStart(stream.map((v1) -> {
                return r1.cast(v1);
            })).get(this.clusterStartTimeout.getSeconds(), TimeUnit.SECONDS);
            this.brokers.stream().findFirst().ifPresent(pekkoConnectorsKafkaContainer -> {
                pekkoConnectorsKafkaContainer.copyFileToContainer(Transferable.of(readinessCheckScript().getBytes(StandardCharsets.UTF_8), 511), READINESS_CHECK_SCRIPT);
            });
            waitForClusterFormation();
            Startables.deepStart(optionalStream(this.schemaRegistry)).get(this.clusterStartTimeout.getSeconds(), TimeUnit.SECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void configureContainerLogging() {
        if (this.containerLogging.booleanValue()) {
            this.log.debug("Testcontainer logging enabled");
            this.brokers.forEach(pekkoConnectorsKafkaContainer -> {
                setContainerLogger("pekko.kafka.testkit.testcontainers.logs.broker.broker-" + pekkoConnectorsKafkaContainer.getBrokerNum(), pekkoConnectorsKafkaContainer);
            });
            setContainerLogger("pekko.kafka.testkit.testcontainers.logs.zookeeper", this.zookeeper);
            this.schemaRegistry.ifPresent(schemaRegistryContainer -> {
                setContainerLogger("pekko.kafka.testkit.testcontainers.logs.schemaregistry", schemaRegistryContainer);
            });
        }
    }

    private void setContainerLogger(String str, GenericContainer<?> genericContainer) {
        genericContainer.withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(str)));
    }

    private void waitForClusterFormation() {
        runReadinessCheck("Readiness check (1/2). ZooKeeper state updated.", () -> {
            String stdout = this.zookeeper.execInContainer(new String[]{"sh", "-c", "zookeeper-shell zookeeper:2181 ls /brokers/ids | tail -n 1"}).getStdout();
            return Boolean.valueOf(stdout != null && stdout.split(",").length == this.brokersNum);
        });
        runReadinessCheck("Readiness check (2/2). Run producer consumer with acks=all.", () -> {
            return (Boolean) this.brokers.stream().findFirst().map((v1) -> {
                return runReadinessCheck(v1);
            }).orElse(false);
        });
    }

    public void stopKafka() {
        this.brokers.forEach((v0) -> {
            v0.stopKafka();
        });
    }

    public void startKafka() {
        this.brokers.forEach((v0) -> {
            v0.startKafka();
        });
        waitForClusterFormation();
    }

    private void runReadinessCheck(String str, Callable<Boolean> callable) {
        try {
            this.log.debug("Start: {}", str);
            Unreliables.retryUntilTrue((int) this.readinessCheckTimeout.getSeconds(), TimeUnit.SECONDS, callable);
            this.log.debug("Passed: {}", str);
        } catch (Throwable th) {
            this.log.error("Failed: {}", str);
            throw th;
        }
    }

    private String readinessCheckScript() {
        String kafkaTopicConnectParam = kafkaTopicConnectParam();
        return ((((((("#!/bin/bash \nset -e \n") + "[[ $(kafka-topics " + kafkaTopicConnectParam + " --describe --topic " + READINESS_CHECK_TOPIC + " | wc -l) > 1 ]] && kafka-topics " + kafkaTopicConnectParam + " --delete --topic " + READINESS_CHECK_TOPIC + " \n") + "kafka-topics " + kafkaTopicConnectParam + " --topic " + READINESS_CHECK_TOPIC + " --create --partitions " + this.brokersNum + " --replication-factor " + this.brokersNum + " --config min.insync.replicas=" + this.brokersNum + " \n") + "MESSAGE=\"`date -u`\" \n") + "echo \"$MESSAGE\" | kafka-console-producer --broker-list localhost:9092 --topic ready-kafka-container-cluster --producer-property acks=all \n") + "kafka-console-consumer --bootstrap-server localhost:9092 --topic ready-kafka-container-cluster --from-beginning --timeout-ms 2000 --max-messages 1 | grep \"$MESSAGE\" \n") + "kafka-topics " + kafkaTopicConnectParam + " --delete --topic " + READINESS_CHECK_TOPIC + " \n") + "echo \"test succeeded\" \n";
    }

    private String kafkaTopicConnectParam() {
        return this.kafkaImageTag.compareTo(BOOTSTRAP_PARAM_MIN_VERSION) >= 0 ? "--bootstrap-server localhost:9092" : "--zookeeper zookeeper:2181";
    }

    private Boolean runReadinessCheck(GenericContainer genericContainer) {
        try {
            Container.ExecResult execInContainer = genericContainer.execInContainer(new String[]{"sh", "-c", READINESS_CHECK_SCRIPT});
            if (execInContainer.getExitCode() == 0 && execInContainer.getStdout().contains("test succeeded")) {
                return true;
            }
            this.log.debug("Readiness check returned errors:\nSTDOUT:\n{}\nSTDERR\n{}", execInContainer.getStdout(), execInContainer.getStderr());
            return false;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void stop() {
        ((Stream) allContainers().parallel()).forEach((v0) -> {
            v0.stop();
        });
    }
}
