/*
 * Decompiled with CFR 0.152.
 */
package io.strimzi.test.container;

import io.strimzi.test.container.DoNotMutate;
import io.strimzi.test.container.KafkaContainer;
import io.strimzi.test.container.KafkaNodeRole;
import io.strimzi.test.container.KafkaVersionService;
import io.strimzi.test.container.StrimziKafkaContainer;
import io.strimzi.test.container.Utils;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.lifecycle.Startables;

public class StrimziKafkaCluster
implements KafkaContainer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StrimziKafkaCluster.class);
    private final int brokersNum;
    private final int controllersNum;
    private final int internalTopicReplicationFactor;
    private final Map<String, String> additionalKafkaConfiguration;
    private final ToxiproxyContainer proxyContainer;
    private final boolean enableSharedNetwork;
    private final String kafkaVersion;
    private final boolean useDedicatedRoles;
    private final String logFilePath;
    private final Network network;
    private Collection<KafkaContainer> nodes;
    private Collection<KafkaContainer> controllers;
    private Collection<KafkaContainer> brokers;
    private final String clusterId;

    private StrimziKafkaCluster(StrimziKafkaClusterBuilder builder) {
        this.brokersNum = builder.brokersNum;
        this.controllersNum = builder.controllersNum;
        this.useDedicatedRoles = builder.useDedicatedRoles;
        this.enableSharedNetwork = builder.enableSharedNetwork;
        this.network = this.enableSharedNetwork ? Network.SHARED : Network.newNetwork();
        this.internalTopicReplicationFactor = builder.internalTopicReplicationFactor == 0 ? this.brokersNum : builder.internalTopicReplicationFactor;
        this.additionalKafkaConfiguration = builder.additionalKafkaConfiguration;
        this.proxyContainer = builder.proxyContainer;
        this.kafkaVersion = builder.kafkaVersion;
        this.clusterId = builder.clusterId;
        this.logFilePath = builder.logFilePath;
        this.validateBrokerNum(this.brokersNum);
        if (this.isUsingDedicatedRoles()) {
            this.validateControllerNum(this.controllersNum);
        }
        this.validateInternalTopicReplicationFactor(this.internalTopicReplicationFactor, this.brokersNum);
        if (this.proxyContainer != null) {
            this.proxyContainer.setNetwork(this.network);
        }
        this.prepareKafkaCluster(this.additionalKafkaConfiguration, this.kafkaVersion);
    }

    private void prepareKafkaCluster(Map<String, String> additionalKafkaConfiguration, String kafkaVersion) {
        HashMap<String, String> defaultKafkaConfigurationForMultiNode = new HashMap<String, String>();
        defaultKafkaConfigurationForMultiNode.put("offsets.topic.replication.factor", String.valueOf(this.internalTopicReplicationFactor));
        defaultKafkaConfigurationForMultiNode.put("num.partitions", String.valueOf(this.internalTopicReplicationFactor));
        defaultKafkaConfigurationForMultiNode.put("transaction.state.log.replication.factor", String.valueOf(this.internalTopicReplicationFactor));
        defaultKafkaConfigurationForMultiNode.put("transaction.state.log.min.isr", String.valueOf(this.internalTopicReplicationFactor));
        this.configureQuorumVoters(additionalKafkaConfiguration);
        if (additionalKafkaConfiguration != null) {
            defaultKafkaConfigurationForMultiNode.putAll(additionalKafkaConfiguration);
        }
        if (this.useDedicatedRoles) {
            this.prepareDedicatedRolesCluster(defaultKafkaConfigurationForMultiNode, kafkaVersion);
        } else {
            this.prepareCombinedRolesCluster(defaultKafkaConfigurationForMultiNode, kafkaVersion);
        }
    }

    private void prepareCombinedRolesCluster(Map<String, String> kafkaConfiguration, String kafkaVersion) {
        this.nodes = IntStream.range(0, this.brokersNum).mapToObj(brokerId -> {
            LOGGER.info("Starting combined-role node with id {}", (Object)brokerId);
            StrimziKafkaContainer kafkaContainer = ((StrimziKafkaContainer)new StrimziKafkaContainer().withBrokerId(brokerId).withKafkaConfigurationMap(kafkaConfiguration).withNetwork(this.network)).withProxyContainer(this.proxyContainer).withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion).withNodeId(brokerId).withClusterId(this.clusterId).withNodeRole(KafkaNodeRole.COMBINED).waitForRunning();
            if (this.logFilePath != null) {
                kafkaContainer.withLogCollection(this.logFilePath);
            }
            LOGGER.info("Started combined role node with id: {}", (Object)kafkaContainer);
            return kafkaContainer;
        }).collect(Collectors.toList());
    }

    private void prepareDedicatedRolesCluster(Map<String, String> kafkaConfiguration, String kafkaVersion) {
        this.controllers = IntStream.range(0, this.controllersNum).mapToObj(controllerId -> {
            LOGGER.info("Starting controller-only node with id {}", (Object)controllerId);
            StrimziKafkaContainer controllerContainer = ((StrimziKafkaContainer)new StrimziKafkaContainer().withBrokerId(controllerId).withKafkaConfigurationMap(kafkaConfiguration).withNetwork(this.network)).withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion).withNodeId(controllerId).withClusterId(this.clusterId).withNodeRole(KafkaNodeRole.CONTROLLER).waitForRunning();
            if (this.logFilePath != null) {
                controllerContainer.withLogCollection(this.logFilePath);
            }
            LOGGER.info("Started controller-only node with id: {}", (Object)controllerContainer);
            return controllerContainer;
        }).collect(Collectors.toList());
        this.brokers = IntStream.range(0, this.brokersNum).mapToObj(brokerIndex -> {
            int brokerId = this.controllersNum + brokerIndex;
            LOGGER.info("Starting broker-only node with broker.id={}", (Object)brokerId);
            StrimziKafkaContainer brokerContainer = ((StrimziKafkaContainer)new StrimziKafkaContainer().withBrokerId(brokerId).withKafkaConfigurationMap(kafkaConfiguration).withNetwork(this.network)).withProxyContainer(this.proxyContainer).withKafkaVersion(kafkaVersion == null ? KafkaVersionService.getInstance().latestRelease().getVersion() : kafkaVersion).withNodeId(brokerId).withClusterId(this.clusterId).withNodeRole(KafkaNodeRole.BROKER).waitForRunning();
            if (this.logFilePath != null) {
                brokerContainer.withLogCollection(this.logFilePath);
            }
            LOGGER.info("Started broker-only node with id: {}", (Object)brokerContainer);
            return brokerContainer;
        }).collect(Collectors.toList());
        this.nodes = new ArrayList<KafkaContainer>();
        this.nodes.addAll(this.controllers);
        this.nodes.addAll(this.brokers);
    }

    private void validateBrokerNum(int brokersNum) {
        if (brokersNum <= 0) {
            throw new IllegalArgumentException("brokersNum '" + brokersNum + "' must be greater than 0");
        }
    }

    private void validateControllerNum(int controllersNum) {
        if (controllersNum <= 0) {
            throw new IllegalArgumentException("controllersNum '" + controllersNum + "' must be greater than 0");
        }
    }

    private void validateInternalTopicReplicationFactor(int internalTopicReplicationFactor, int brokersNum) {
        if (internalTopicReplicationFactor < 1 || internalTopicReplicationFactor > brokersNum) {
            throw new IllegalArgumentException("internalTopicReplicationFactor '" + internalTopicReplicationFactor + "' must be between 1 and " + brokersNum);
        }
    }

    public Collection<GenericContainer<?>> getNodes() {
        return this.nodes.stream().map(node -> (GenericContainer)node).collect(Collectors.toList());
    }

    @DoNotMutate
    public String getNetworkBootstrapServers() {
        return this.getBrokers().stream().map(broker -> ((StrimziKafkaContainer)broker).getNetworkBootstrapServers()).collect(Collectors.joining(","));
    }

    @Override
    public String getBootstrapServers() {
        return this.getBrokers().stream().map(KafkaContainer::getBootstrapServers).collect(Collectors.joining(","));
    }

    @Override
    public String getBootstrapControllers() {
        return this.getControllers().stream().map(KafkaContainer::getBootstrapControllers).collect(Collectors.joining(","));
    }

    @DoNotMutate
    public String getNetworkBootstrapControllers() {
        return this.getControllers().stream().map(controller -> ((StrimziKafkaContainer)controller).getNetworkBootstrapControllers()).collect(Collectors.joining(","));
    }

    int getInternalTopicReplicationFactor() {
        return this.internalTopicReplicationFactor;
    }

    boolean isSharedNetworkEnabled() {
        return this.enableSharedNetwork;
    }

    Map<String, String> getAdditionalKafkaConfiguration() {
        return this.additionalKafkaConfiguration;
    }

    private void configureQuorumVoters(Map<String, String> additionalKafkaConfiguration) {
        String quorumVoters = this.useDedicatedRoles ? IntStream.range(0, this.controllersNum).mapToObj(controllerId -> String.format("%d@broker-%d:9094", controllerId, controllerId)).collect(Collectors.joining(",")) : IntStream.range(0, this.brokersNum).mapToObj(brokerId -> String.format("%d@broker-%d:9094", brokerId, brokerId)).collect(Collectors.joining(","));
        additionalKafkaConfiguration.put("controller.quorum.voters", quorumVoters);
    }

    @DoNotMutate
    public void start() {
        Stream<KafkaContainer> startables = this.nodes.stream();
        try {
            Startables.deepStart(startables).get(60L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while starting Kafka containers", e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException("Failed to start Kafka containers", e);
        }
        catch (TimeoutException e) {
            throw new RuntimeException("Timed out while starting Kafka containers", e);
        }
        Utils.waitFor("Kafka brokers to form a quorum", Duration.ofSeconds(1L), Duration.ofMinutes(1L), this::checkAllBrokersReady);
    }

    @DoNotMutate
    private boolean checkAllBrokersReady() {
        try {
            Collection<KafkaContainer> brokersToCheck = this.getBrokers();
            for (KafkaContainer kafkaContainer : brokersToCheck) {
                if (this.isBrokerReady((StrimziKafkaContainer)kafkaContainer)) continue;
                return false;
            }
            return true;
        }
        catch (IOException | InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Failed to execute command in Kafka container", e);
        }
    }

    @DoNotMutate
    private boolean isBrokerReady(StrimziKafkaContainer kafkaContainer) throws IOException, InterruptedException {
        Container.ExecResult result = kafkaContainer.execInContainer(new String[]{"bash", "-c", "bin/kafka-metadata-quorum.sh --bootstrap-server localhost:9091 describe --status"});
        String output = result.getStdout();
        LOGGER.info("Metadata quorum status from broker {}: {}", (Object)kafkaContainer.getBrokerId(), (Object)output);
        if (output == null || output.isEmpty()) {
            return false;
        }
        return this.isValidLeaderIdPresent(output);
    }

    @DoNotMutate
    private boolean isValidLeaderIdPresent(String output) {
        Pattern leaderIdPattern = Pattern.compile("LeaderId:\\s+(\\d+)");
        Matcher leaderIdMatcher = leaderIdPattern.matcher(output);
        if (!leaderIdMatcher.find()) {
            return false;
        }
        String leaderIdStr = leaderIdMatcher.group(1);
        try {
            int leaderId = Integer.parseInt(leaderIdStr);
            return leaderId >= 0;
        }
        catch (NumberFormatException e) {
            return false;
        }
    }

    @DoNotMutate
    public void stop() {
        ((Stream)this.nodes.stream().parallel()).forEach(Startable::stop);
    }

    protected Network getNetwork() {
        return this.network;
    }

    public Collection<KafkaContainer> getControllers() {
        if (this.useDedicatedRoles) {
            return this.controllers;
        }
        return this.nodes;
    }

    public Collection<KafkaContainer> getBrokers() {
        if (this.useDedicatedRoles) {
            return this.brokers;
        }
        return this.nodes;
    }

    public boolean isUsingDedicatedRoles() {
        return this.useDedicatedRoles;
    }

    public static class StrimziKafkaClusterBuilder {
        private int brokersNum;
        private int controllersNum;
        private boolean useDedicatedRoles;
        private int internalTopicReplicationFactor;
        private Map<String, String> additionalKafkaConfiguration = new HashMap<String, String>();
        private ToxiproxyContainer proxyContainer;
        private boolean enableSharedNetwork;
        private String kafkaVersion;
        private String clusterId;
        private String logFilePath;

        public StrimziKafkaClusterBuilder withNumberOfBrokers(int brokersNum) {
            this.brokersNum = brokersNum;
            return this;
        }

        public StrimziKafkaClusterBuilder withInternalTopicReplicationFactor(int internalTopicReplicationFactor) {
            this.internalTopicReplicationFactor = internalTopicReplicationFactor;
            return this;
        }

        public StrimziKafkaClusterBuilder withAdditionalKafkaConfiguration(Map<String, String> additionalKafkaConfiguration) {
            if (additionalKafkaConfiguration != null) {
                this.additionalKafkaConfiguration.putAll(additionalKafkaConfiguration);
            }
            return this;
        }

        public StrimziKafkaClusterBuilder withProxyContainer(ToxiproxyContainer proxyContainer) {
            this.proxyContainer = proxyContainer;
            return this;
        }

        public StrimziKafkaClusterBuilder withSharedNetwork() {
            this.enableSharedNetwork = true;
            return this;
        }

        public StrimziKafkaClusterBuilder withKafkaVersion(String kafkaVersion) {
            this.kafkaVersion = kafkaVersion;
            return this;
        }

        public StrimziKafkaClusterBuilder withDedicatedRoles() {
            this.useDedicatedRoles = true;
            return this;
        }

        public StrimziKafkaClusterBuilder withNumberOfControllers(int controllersNum) {
            this.controllersNum = controllersNum;
            return this;
        }

        public StrimziKafkaClusterBuilder withLogCollection() {
            this.logFilePath = "target/strimzi-test-container-logs/";
            return this;
        }

        public StrimziKafkaClusterBuilder withLogCollection(String logFilePath) {
            if (logFilePath == null || logFilePath.trim().isEmpty()) {
                throw new IllegalArgumentException("Log file path cannot be null or empty.");
            }
            this.logFilePath = logFilePath.trim();
            return this;
        }

        public StrimziKafkaCluster build() {
            this.clusterId = UUID.randomUUID().toString();
            return new StrimziKafkaCluster(this);
        }
    }
}

