package kafka.test.junit;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.server.KafkaServer;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.annotation.Type;
import kafka.utils.EmptyTestInfo;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.Option;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.compat.java8.OptionConverters;

/* loaded from: input_file:kafka/test/junit/ZkClusterInvocationContext.class */
public class ZkClusterInvocationContext implements TestTemplateInvocationContext {
    private final String baseDisplayName;
    private final ClusterConfig clusterConfig;
    private final AtomicReference<ClusterConfigurableIntegrationHarness> clusterReference = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kafka/test/junit/ZkClusterInvocationContext$ClusterConfigurableIntegrationHarness.class */
    public static class ClusterConfigurableIntegrationHarness extends IntegrationTestHarness {
        private ClusterConfig clusterConfig;

        private ClusterConfigurableIntegrationHarness(ClusterConfig clusterConfig) {
            this.clusterConfig = (ClusterConfig) Objects.requireNonNull(clusterConfig);
        }

        public void setClusterConfig(ClusterConfig clusterConfig) {
            this.clusterConfig = (ClusterConfig) Objects.requireNonNull(clusterConfig);
        }

        @Override // kafka.api.IntegrationTestHarness
        public void modifyConfigs(Seq<Properties> seq) {
            super.modifyConfigs(seq);
            for (int i = 0; i < seq.length(); i++) {
                ((Properties) seq.apply(i)).putAll(this.clusterConfig.perServerOverrideProperties().getOrDefault(Integer.valueOf(i), Collections.emptyMap()));
            }
        }

        @Override // kafka.api.IntegrationTestHarness
        public Properties serverConfig() {
            Properties properties = new Properties();
            properties.putAll(this.clusterConfig.serverProperties());
            properties.put("inter.broker.protocol.version", this.clusterConfig.metadataVersion().versionWithSuffix());
            return properties;
        }

        @Override // kafka.api.IntegrationTestHarness
        public Properties adminClientConfig() {
            Properties properties = new Properties();
            properties.putAll(this.clusterConfig.adminClientProperties());
            return properties;
        }

        @Override // kafka.api.IntegrationTestHarness
        public Properties consumerConfig() {
            Properties properties = new Properties();
            properties.putAll(this.clusterConfig.consumerProperties());
            return properties;
        }

        @Override // kafka.api.IntegrationTestHarness
        public Properties producerConfig() {
            Properties properties = new Properties();
            properties.putAll(this.clusterConfig.producerProperties());
            return properties;
        }

        @Override // kafka.integration.KafkaServerTestHarness
        public SecurityProtocol securityProtocol() {
            return this.clusterConfig.securityProtocol();
        }

        @Override // kafka.integration.KafkaServerTestHarness
        public ListenerName listenerName() {
            return (ListenerName) this.clusterConfig.listenerName().map(ListenerName::normalised).orElseGet(() -> {
                return ListenerName.forSecurityProtocol(securityProtocol());
            });
        }

        @Override // kafka.integration.KafkaServerTestHarness
        /* renamed from: serverSaslProperties */
        public Option<Properties> mo23serverSaslProperties() {
            if (this.clusterConfig.saslServerProperties().isEmpty()) {
                return Option.empty();
            }
            Properties properties = new Properties();
            properties.putAll(this.clusterConfig.saslServerProperties());
            return Option.apply(properties);
        }

        @Override // kafka.integration.KafkaServerTestHarness
        /* renamed from: clientSaslProperties */
        public Option<Properties> mo22clientSaslProperties() {
            if (this.clusterConfig.saslClientProperties().isEmpty()) {
                return Option.empty();
            }
            Properties properties = new Properties();
            properties.putAll(this.clusterConfig.saslClientProperties());
            return Option.apply(properties);
        }

        @Override // kafka.api.IntegrationTestHarness
        public int brokerCount() {
            return this.clusterConfig.numBrokers();
        }

        @Override // kafka.api.IntegrationTestHarness
        public int logDirCount() {
            return this.clusterConfig.numDisksPerBroker();
        }

        @Override // kafka.integration.KafkaServerTestHarness
        /* renamed from: trustStoreFile */
        public Option<File> mo24trustStoreFile() {
            return OptionConverters.toScala(this.clusterConfig.trustStoreFile());
        }
    }

    /* loaded from: input_file:kafka/test/junit/ZkClusterInvocationContext$ZkClusterInstance.class */
    public static class ZkClusterInstance implements ClusterInstance {
        final AtomicReference<ClusterConfigurableIntegrationHarness> clusterReference;
        final ClusterConfig config;
        final AtomicBoolean started = new AtomicBoolean(false);
        final AtomicBoolean stopped = new AtomicBoolean(false);

        /* JADX INFO: Access modifiers changed from: package-private */
        public ZkClusterInstance(ClusterConfig clusterConfig, AtomicReference<ClusterConfigurableIntegrationHarness> atomicReference) {
            this.config = clusterConfig;
            this.clusterReference = atomicReference;
        }

        @Override // kafka.test.ClusterInstance
        public String bootstrapServers() {
            return TestUtils.bootstrapServers(this.clusterReference.get().servers(), this.clusterReference.get().listenerName());
        }

        @Override // kafka.test.ClusterInstance
        public String bootstrapControllers() {
            throw new RuntimeException("Cannot use --bootstrap-controller with ZK-based clusters.");
        }

        @Override // kafka.test.ClusterInstance
        public ListenerName clientListener() {
            return this.clusterReference.get().listenerName();
        }

        @Override // kafka.test.ClusterInstance
        public Optional<ListenerName> controlPlaneListenerName() {
            return OptionConverters.toJava(((KafkaServer) this.clusterReference.get().servers().head()).config().controlPlaneListenerName());
        }

        @Override // kafka.test.ClusterInstance
        public ListenerName controllerListener() {
            return clientListener();
        }

        @Override // kafka.test.ClusterInstance
        public Collection<SocketServer> controllerSocketServers() {
            return (Collection) brokers().values().stream().filter(kafkaBroker -> {
                return ((KafkaServer) kafkaBroker).kafkaController().isActive();
            }).map((v0) -> {
                return v0.socketServer();
            }).collect(Collectors.toList());
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer anyNonControllerBrokerSocketServer() {
            return (SocketServer) brokers().values().stream().filter(kafkaBroker -> {
                return !((KafkaServer) kafkaBroker).kafkaController().isActive();
            }).map((v0) -> {
                return v0.socketServer();
            }).findFirst().orElseThrow(() -> {
                return new RuntimeException("No broker SocketServers found");
            });
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer activeController() {
            return anyControllerSocketServer();
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer linkCoordinator(String str) {
            List list = (List) brokers().values().stream().filter(kafkaBroker -> {
                return kafkaBroker.clusterLinkManager().isLinkCoordinator(str);
            }).map((v0) -> {
                return v0.socketServer();
            }).collect(Collectors.toList());
            if (list.size() != 1) {
                throw new IllegalStateException("Failed to find a single link coordinator for " + str);
            }
            return (SocketServer) list.get(0);
        }

        @Override // kafka.test.ClusterInstance
        public String clusterId() {
            return (String) brokers().values().stream().findFirst().map((v0) -> {
                return v0.clusterId();
            }).orElseThrow(() -> {
                return new RuntimeException("No broker instances found");
            });
        }

        @Override // kafka.test.ClusterInstance
        public Type type() {
            return Type.ZK;
        }

        @Override // kafka.test.ClusterInstance
        public ClusterConfig config() {
            return this.config;
        }

        @Override // kafka.test.ClusterInstance
        public Set<Integer> controllerIds() {
            return brokerIds();
        }

        @Override // kafka.test.ClusterInstance
        public IntegrationTestHarness getUnderlying() {
            return this.clusterReference.get();
        }

        @Override // kafka.test.ClusterInstance
        public ClusterInstance duplicateCluster(Consumer<ClusterConfig.Builder> consumer) {
            ClusterConfig copyOf;
            if (this.config.clusterTypes().contains(Type.ZK_TO_KRAFT)) {
                HashMap hashMap = new HashMap(this.config.serverProperties());
                hashMap.put("zookeeper.metadata.migration.enable", "false");
                hashMap.remove("controller.quorum.voters");
                hashMap.remove("controller.listener.names");
                copyOf = this.config.copyOf(builder -> {
                    consumer.accept(builder);
                    builder.setTypes(Collections.singleton(Type.ZK));
                    builder.setFirstBrokerId(100);
                    if (this.config.numControllers() < 1) {
                        builder.setControllers(1);
                    }
                    builder.setServerProperties(hashMap);
                });
            } else if (this.config.clusterTypes().contains(Type.ZK)) {
                HashMap hashMap2 = new HashMap(this.config.serverProperties());
                hashMap2.put("zookeeper.metadata.migration.enable", "false");
                hashMap2.remove("controller.quorum.voters");
                hashMap2.remove("controller.listener.names");
                copyOf = this.config.copyOf(builder2 -> {
                    consumer.accept(builder2);
                    builder2.setServerProperties(hashMap2);
                });
            } else {
                copyOf = this.config.copyOf(consumer);
            }
            ZkClusterInstance zkClusterInstance = new ZkClusterInstance(copyOf, new AtomicReference(ZkClusterInvocationContext.createClusterReference(copyOf)));
            if (this.config.isAutoStart()) {
                zkClusterInstance.start();
                try {
                    zkClusterInstance.waitForReadyBrokers();
                } catch (InterruptedException e) {
                    throw new RuntimeException("Interrupted while waiting for ready brokers", e);
                }
            }
            return zkClusterInstance;
        }

        @Override // kafka.test.ClusterInstance
        public Admin createAdminClient(Properties properties) {
            if (!properties.contains("bootstrap.servers")) {
                properties.put("bootstrap.servers", bootstrapServers());
            }
            return this.clusterReference.get().createConfluentAdminClient(properties);
        }

        @Override // kafka.test.ClusterInstance
        public void start() {
            if (this.started.compareAndSet(false, true)) {
                this.clusterReference.get().setUp(new EmptyTestInfo());
            }
        }

        @Override // kafka.test.ClusterInstance
        public void stop() {
            if (this.stopped.compareAndSet(false, true)) {
                this.clusterReference.get().tearDown();
            }
        }

        @Override // kafka.test.ClusterInstance
        public void shutdownBroker(int i) {
            findBrokerOrThrow(i).shutdown();
        }

        @Override // kafka.test.ClusterInstance
        public void startBroker(int i) {
            findBrokerOrThrow(i).startup();
        }

        @Override // kafka.test.ClusterInstance
        public void rollingBrokerRestart() {
            if (!this.started.get()) {
                throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!");
            }
            for (int i = 0; i < this.clusterReference.get().brokerCount(); i++) {
                this.clusterReference.get().killBroker(i);
            }
            this.clusterReference.get().restartDeadBrokers(true);
            this.clusterReference.get().adminClientConfig().put("bootstrap.servers", bootstrapServers());
        }

        public void rollingBrokerRestart(Optional<ClusterConfig> optional) {
            Objects.requireNonNull(optional);
            if (!this.started.get()) {
                throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!");
            }
            for (int i = 0; i < this.clusterReference.get().brokerCount(); i++) {
                this.clusterReference.get().killBroker(i);
            }
            optional.ifPresent(clusterConfig -> {
                this.clusterReference.get().setClusterConfig(clusterConfig);
            });
            this.clusterReference.get().restartDeadBrokers(true);
            this.clusterReference.get().adminClientConfig().put("bootstrap.servers", bootstrapServers());
        }

        @Override // kafka.test.ClusterInstance
        public void killAllBrokers() {
            if (!this.started.get()) {
                throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!");
            }
            for (int i = 0; i < this.clusterReference.get().brokerCount(); i++) {
                this.clusterReference.get().killBroker(i);
            }
        }

        @Override // kafka.test.ClusterInstance
        public void waitForReadyBrokers() throws InterruptedException {
            org.apache.kafka.test.TestUtils.waitForCondition(() -> {
                return this.clusterReference.get().zkClient().getAllBrokersInCluster().size() == this.config.numBrokers();
            }, "Timed out while waiting for brokers to become ready");
        }

        private KafkaServer findBrokerOrThrow(int i) {
            return (KafkaServer) brokers().values().stream().filter(kafkaBroker -> {
                return kafkaBroker.config().brokerId() == i;
            }).map(kafkaBroker2 -> {
                return (KafkaServer) kafkaBroker2;
            }).findFirst().orElseThrow(() -> {
                return new IllegalArgumentException("Unknown brokerId " + i);
            });
        }

        @Override // kafka.test.ClusterInstance
        public Map<Integer, ControllerServer> controllers() {
            return Collections.emptyMap();
        }

        @Override // kafka.test.ClusterInstance
        public Map<Integer, KafkaBroker> brokers() {
            return (Map) JavaConverters.asJavaCollection(this.clusterReference.get().servers()).stream().collect(Collectors.toMap(kafkaServer -> {
                return Integer.valueOf(kafkaServer.config().brokerId());
            }, kafkaServer2 -> {
                return kafkaServer2;
            }));
        }

        @Override // kafka.test.ClusterInstance
        public Map<Integer, KafkaBroker> brokersMap() {
            return (Map) brokers().values().stream().collect(Collectors.toMap(kafkaBroker -> {
                return Integer.valueOf(kafkaBroker.config().nodeId());
            }, Function.identity()));
        }
    }

    public ZkClusterInvocationContext(String str, ClusterConfig clusterConfig) {
        this.baseDisplayName = str;
        this.clusterConfig = clusterConfig;
    }

    public String getDisplayName(int i) {
        return String.format("%s [%d] Type=ZK, %s", this.baseDisplayName, Integer.valueOf(i), String.join(",", this.clusterConfig.displayTags()));
    }

    public List<Extension> getAdditionalExtensions() {
        if (this.clusterConfig.numControllers() != 1) {
            throw new IllegalArgumentException("For ZK clusters, please specify exactly 1 controller.");
        }
        ZkClusterInstance zkClusterInstance = new ZkClusterInstance(this.clusterConfig, this.clusterReference);
        return Arrays.asList(extensionContext -> {
            this.clusterReference.set(new ClusterConfigurableIntegrationHarness(this.clusterConfig));
            if (this.clusterConfig.isAutoStart()) {
                zkClusterInstance.start();
            }
        }, extensionContext2 -> {
            zkClusterInstance.stop();
        }, new ClusterInstanceParameterResolver(zkClusterInstance));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static ClusterConfigurableIntegrationHarness createClusterReference(final ClusterConfig clusterConfig) {
        return new ClusterConfigurableIntegrationHarness(clusterConfig) { // from class: kafka.test.junit.ZkClusterInvocationContext.1
            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.api.IntegrationTestHarness
            public void modifyConfigs(Seq<Properties> seq) {
                super.modifyConfigs(seq);
                for (int i = 0; i < seq.length(); i++) {
                    ((Properties) seq.apply(i)).putAll(clusterConfig.perServerOverrideProperties().getOrDefault(Integer.valueOf(i), Collections.emptyMap()));
                }
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.api.IntegrationTestHarness
            public Properties serverConfig() {
                Properties properties = new Properties();
                properties.putAll(clusterConfig.serverProperties());
                properties.put("inter.broker.protocol.version", clusterConfig.metadataVersion().versionWithSuffix());
                return properties;
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.api.IntegrationTestHarness
            public Properties adminClientConfig() {
                Properties properties = new Properties();
                properties.putAll(clusterConfig.adminClientProperties());
                return properties;
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.api.IntegrationTestHarness
            public Properties consumerConfig() {
                Properties properties = new Properties();
                properties.putAll(clusterConfig.consumerProperties());
                return properties;
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.api.IntegrationTestHarness
            public Properties producerConfig() {
                Properties properties = new Properties();
                properties.putAll(clusterConfig.producerProperties());
                return properties;
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.integration.KafkaServerTestHarness
            public SecurityProtocol securityProtocol() {
                return clusterConfig.securityProtocol();
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.integration.KafkaServerTestHarness
            public ListenerName listenerName() {
                return (ListenerName) clusterConfig.listenerName().map(ListenerName::normalised).orElseGet(() -> {
                    return ListenerName.forSecurityProtocol(securityProtocol());
                });
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.integration.KafkaServerTestHarness
            /* renamed from: serverSaslProperties */
            public Option<Properties> mo23serverSaslProperties() {
                if (clusterConfig.saslServerProperties().isEmpty()) {
                    return Option.empty();
                }
                Properties properties = new Properties();
                properties.putAll(clusterConfig.saslServerProperties());
                return Option.apply(properties);
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.integration.KafkaServerTestHarness
            /* renamed from: clientSaslProperties */
            public Option<Properties> mo22clientSaslProperties() {
                if (clusterConfig.saslClientProperties().isEmpty()) {
                    return Option.empty();
                }
                Properties properties = new Properties();
                properties.putAll(clusterConfig.saslClientProperties());
                return Option.apply(properties);
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.api.IntegrationTestHarness
            public int brokerCount() {
                return clusterConfig.numBrokers();
            }

            @Override // kafka.test.junit.ZkClusterInvocationContext.ClusterConfigurableIntegrationHarness, kafka.integration.KafkaServerTestHarness
            /* renamed from: trustStoreFile */
            public Option<File> mo24trustStoreFile() {
                return OptionConverters.toScala(clusterConfig.trustStoreFile());
            }
        };
    }
}
