package kafka.test.junit;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.test.annotation.Type;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.metadata.BrokerState;
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import scala.Option;
import scala.compat.java8.OptionConverters;

/* loaded from: input_file:kafka/test/junit/RaftClusterInvocationContext.class */
public class RaftClusterInvocationContext implements TestTemplateInvocationContext {
    private final String baseDisplayName;
    private final ClusterConfig clusterConfig;
    private final boolean isCombined;

    /* loaded from: input_file:kafka/test/junit/RaftClusterInvocationContext$RaftClusterInstance.class */
    public static class RaftClusterInstance implements ClusterInstance {
        private final ClusterConfig clusterConfig;
        final AtomicBoolean started = new AtomicBoolean(false);
        final AtomicBoolean stopped = new AtomicBoolean(false);
        final AtomicBoolean formated = new AtomicBoolean(false);
        private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();
        private EmbeddedZookeeper embeddedZookeeper;
        private KafkaClusterTestKit clusterTestKit;
        private final boolean isCombined;

        /* JADX INFO: Access modifiers changed from: package-private */
        public RaftClusterInstance(ClusterConfig clusterConfig, boolean z) {
            this.clusterConfig = clusterConfig;
            this.isCombined = z;
        }

        @Override // kafka.test.ClusterInstance
        public String bootstrapServers() {
            return this.clusterTestKit.bootstrapServers();
        }

        @Override // kafka.test.ClusterInstance
        public String bootstrapControllers() {
            return this.clusterTestKit.bootstrapControllers();
        }

        @Override // kafka.test.ClusterInstance
        public ListenerName clientListener() {
            return ListenerName.normalised("EXTERNAL");
        }

        @Override // kafka.test.ClusterInstance
        public ListenerName controllerListener() {
            return ListenerName.normalised("CONTROLLER");
        }

        @Override // kafka.test.ClusterInstance
        public Optional<ListenerName> controllerListenerName() {
            return controllers().values().stream().findAny().flatMap(controllerServer -> {
                return OptionConverters.toJava(controllerServer.config().controllerListenerNames().headOption());
            }).map(ListenerName::new);
        }

        @Override // kafka.test.ClusterInstance
        public Collection<SocketServer> controllerSocketServers() {
            return (Collection) controllers().values().stream().map((v0) -> {
                return v0.socketServer();
            }).collect(Collectors.toList());
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer anyNonControllerBrokerSocketServer() {
            return anyBrokerSocketServer();
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer activeController() throws InterruptedException {
            TestUtils.waitForCondition(() -> {
                return controllers().values().stream().anyMatch(controllerServer -> {
                    return controllerServer.controller().isActive();
                });
            }, "Timed out waiting for active controller");
            return (SocketServer) controllers().values().stream().filter(controllerServer -> {
                return controllerServer.controller().isActive();
            }).map((v0) -> {
                return v0.socketServer();
            }).findFirst().orElseThrow(() -> {
                return new RuntimeException("No controller SocketServers found");
            });
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer linkCoordinator(String str) {
            List list = (List) brokers().values().stream().filter(kafkaBroker -> {
                return kafkaBroker.clusterLinkManager().isLinkCoordinator(str);
            }).map((v0) -> {
                return v0.socketServer();
            }).collect(Collectors.toList());
            if (list.size() != 1) {
                throw new IllegalStateException("Failed to find a single link coordinator for " + str);
            }
            return (SocketServer) list.get(0);
        }

        @Override // kafka.test.ClusterInstance
        public String clusterId() {
            return (String) Stream.concat(controllers().values().stream().map((v0) -> {
                return v0.clusterId();
            }), brokers().values().stream().map((v0) -> {
                return v0.clusterId();
            })).findFirst().orElseThrow(() -> {
                return new RuntimeException("No controllers or brokers!");
            });
        }

        @Override // kafka.test.ClusterInstance
        public Type type() {
            return this.isCombined ? Type.CO_KRAFT : Type.KRAFT;
        }

        @Override // kafka.test.ClusterInstance
        public ClusterConfig config() {
            return this.clusterConfig;
        }

        @Override // kafka.test.ClusterInstance
        public Set<Integer> controllerIds() {
            return controllers().keySet();
        }

        @Override // kafka.test.ClusterInstance
        public KafkaClusterTestKit getUnderlying() {
            return this.clusterTestKit;
        }

        @Override // kafka.test.ClusterInstance
        public RaftClusterInstance duplicateCluster(Consumer<ClusterConfig.Builder> consumer) {
            try {
                HashMap hashMap = new HashMap(this.clusterConfig.serverProperties());
                hashMap.put("zookeeper.metadata.migration.enable", "false");
                RaftClusterInstance raftClusterInstance = new RaftClusterInstance(this.clusterConfig.copyOf(builder -> {
                    consumer.accept(builder);
                    builder.setServerProperties(hashMap);
                }), this.isCombined);
                if (this.clusterConfig.isAutoStart()) {
                    raftClusterInstance.start();
                    try {
                        raftClusterInstance.waitForReadyBrokers();
                    } catch (InterruptedException e) {
                        throw new RuntimeException("Interrupted while waiting for ready brokers", e);
                    }
                }
                return raftClusterInstance;
            } catch (Exception e2) {
                throw new RuntimeException("Failed to create duplicate cluster", e2);
            }
        }

        @Override // kafka.test.ClusterInstance
        public Admin createAdminClient(Properties properties) {
            Admin create = Admin.create(this.clusterTestKit.newClientPropertiesBuilder(properties).build());
            this.admins.add(create);
            return create;
        }

        @Override // kafka.test.ClusterInstance
        public void start() {
            try {
                format();
                if (this.started.compareAndSet(false, true)) {
                    this.clusterTestKit.startup();
                }
            } catch (Exception e) {
                throw new RuntimeException("Failed to start Raft server", e);
            }
        }

        @Override // kafka.test.ClusterInstance
        public void stop() {
            if (this.stopped.compareAndSet(false, true)) {
                this.admins.forEach(admin -> {
                    Utils.closeQuietly(admin, "admin");
                });
                this.admins.clear();
                Utils.closeQuietly(this.clusterTestKit, "cluster");
                if (this.embeddedZookeeper != null) {
                    Utils.closeQuietly(this.embeddedZookeeper, "zk");
                }
            }
        }

        @Override // kafka.test.ClusterInstance
        public void shutdownBroker(int i) {
            findBrokerOrThrow(i).shutdown();
        }

        @Override // kafka.test.ClusterInstance
        public void startBroker(int i) {
            findBrokerOrThrow(i).startup();
        }

        @Override // kafka.test.ClusterInstance
        public void waitForReadyBrokers() throws InterruptedException {
            try {
                this.clusterTestKit.waitForReadyBrokers();
            } catch (ExecutionException e) {
                throw new AssertionError("Failed while waiting for brokers to become ready", e);
            }
        }

        @Override // kafka.test.ClusterInstance
        public Map<Integer, KafkaBroker> brokersMap() {
            return (Map) brokers().values().stream().collect(Collectors.toMap(kafkaBroker -> {
                return Integer.valueOf(kafkaBroker.config().nodeId());
            }, Function.identity()));
        }

        @Override // kafka.test.ClusterInstance
        public void rollingBrokerRestart() {
            throw new UnsupportedOperationException("Restarting Raft servers is not yet supported.");
        }

        @Override // kafka.test.ClusterInstance
        public void killAllBrokers() {
            throw new UnsupportedOperationException("Restarting Raft servers is not yet supported.");
        }

        public Map<Integer, MetadataImage> metadataImage() {
            return (Map) brokers().values().stream().collect(Collectors.toMap(kafkaBroker -> {
                return Integer.valueOf(kafkaBroker.config().nodeId());
            }, kafkaBroker2 -> {
                return ((BrokerServer) kafkaBroker2).metadataCache().currentImage();
            }));
        }

        @Override // kafka.test.ClusterInstance
        public Map<Integer, KafkaBroker> brokers() {
            return (Map) this.clusterTestKit.brokers().entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }));
        }

        @Override // kafka.test.ClusterInstance
        public Map<Integer, ControllerServer> controllers() {
            return Collections.unmodifiableMap(this.clusterTestKit.controllers());
        }

        public void format() throws Exception {
            if (this.formated.compareAndSet(false, true)) {
                ArrayList arrayList = new ArrayList();
                arrayList.add(new ApiMessageAndVersion(new FeatureLevelRecord().setName("confluent.metadata.version").setFeatureLevel(this.clusterConfig.metadataVersion().confluentFeatureLevel()), (short) 0));
                this.clusterConfig.features().forEach((features, sh) -> {
                    arrayList.add(new ApiMessageAndVersion(new FeatureLevelRecord().setName(features.featureName()).setFeatureLevel(sh.shortValue()), (short) 0));
                });
                KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(new TestKitNodes.Builder().setBootstrapMetadata(BootstrapMetadata.fromRecords(arrayList, "testkit", false)).setCombined(this.isCombined).setNumBrokerNodes(this.clusterConfig.numBrokers()).setNumDisksPerBroker(this.clusterConfig.numDisksPerBroker()).setPerServerProperties(this.clusterConfig.perServerOverrideProperties()).setNumControllerNodes(this.clusterConfig.numControllers()).build());
                if (Boolean.parseBoolean(this.clusterConfig.serverProperties().getOrDefault("zookeeper.metadata.migration.enable", "false"))) {
                    this.embeddedZookeeper = new EmbeddedZookeeper();
                    builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", Integer.valueOf(this.embeddedZookeeper.port())));
                }
                Map<String, String> serverProperties = this.clusterConfig.serverProperties();
                builder.getClass();
                serverProperties.forEach((v1, v2) -> {
                    r1.setConfigProp(v1, v2);
                });
                this.clusterTestKit = builder.build();
                this.clusterTestKit.format();
            }
        }

        public Map.Entry<KafkaClusterTestKit, EmbeddedZookeeper> createClusterReference(ClusterConfig clusterConfig, boolean z, Uuid uuid, Optional<EmbeddedZookeeper> optional) throws Exception {
            EmbeddedZookeeper embeddedZookeeper = null;
            KafkaClusterTestKit kafkaClusterTestKit = null;
            try {
                TestKitNodes build = new TestKitNodes.Builder().setClusterId(uuid.toString()).setBootstrapMetadataVersion(clusterConfig.metadataVersion()).setCombined(z).setNumBrokerNodes(clusterConfig.numBrokers()).setNumControllerNodes(clusterConfig.numControllers()).build();
                build.brokerNodes().forEach((num, testKitNode) -> {
                    clusterConfig.perServerOverrideProperties().getOrDefault(num, Collections.emptyMap()).forEach((str, str2) -> {
                        testKitNode.propertyOverrides().put(str.toString(), str2.toString());
                    });
                });
                KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(build);
                if (Boolean.parseBoolean(clusterConfig.serverProperties().getOrDefault("zookeeper.metadata.migration.enable", "false"))) {
                    embeddedZookeeper = optional.orElseGet(EmbeddedZookeeper::new);
                    builder.setConfigProp("zookeeper.connect", String.format("localhost:%d", Integer.valueOf(embeddedZookeeper.port())));
                }
                clusterConfig.serverProperties().forEach((str, str2) -> {
                    builder.setConfigProp(str.toString(), str2.toString());
                });
                kafkaClusterTestKit = builder.build();
                this.clusterTestKit = kafkaClusterTestKit;
                this.clusterTestKit.format();
                this.formated.set(true);
            } catch (Exception e) {
                if (embeddedZookeeper != null && !optional.isPresent()) {
                    embeddedZookeeper.shutdown();
                }
            }
            return new AbstractMap.SimpleImmutableEntry(kafkaClusterTestKit, embeddedZookeeper);
        }

        private BrokerServer findBrokerOrThrow(int i) {
            return (BrokerServer) Optional.ofNullable(this.clusterTestKit.brokers().get(Integer.valueOf(i))).orElseThrow(() -> {
                return new IllegalArgumentException("Unknown brokerId " + i);
            });
        }

        public Option<CompletableFuture<Integer>> interBrokerPortFuture() {
            return this.clusterTestKit.interBrokerPortFuture();
        }

        @Override // kafka.test.ClusterInstance
        public /* bridge */ /* synthetic */ ClusterInstance duplicateCluster(Consumer consumer) {
            return duplicateCluster((Consumer<ClusterConfig.Builder>) consumer);
        }
    }

    public RaftClusterInvocationContext(String str, ClusterConfig clusterConfig, boolean z) {
        this.baseDisplayName = str;
        this.clusterConfig = clusterConfig;
        this.isCombined = z;
    }

    public String getDisplayName(int i) {
        Object[] objArr = new Object[4];
        objArr[0] = this.baseDisplayName;
        objArr[1] = Integer.valueOf(i);
        objArr[2] = this.isCombined ? "Combined" : "Isolated";
        objArr[3] = String.join(",", this.clusterConfig.displayTags());
        return String.format("%s [%d] Type=Raft-%s, %s", objArr);
    }

    public List<Extension> getAdditionalExtensions() {
        RaftClusterInstance raftClusterInstance = new RaftClusterInstance(this.clusterConfig, this.isCombined);
        return Arrays.asList(extensionContext -> {
            raftClusterInstance.format();
            if (this.clusterConfig.isAutoStart()) {
                raftClusterInstance.start();
                kafka.utils.TestUtils.waitUntilTrue(() -> {
                    return Boolean.valueOf(raftClusterInstance.getUnderlying().brokers().values().stream().allMatch(brokerServer -> {
                        return brokerServer.brokerState() == BrokerState.RUNNING;
                    }));
                }, () -> {
                    return "Broker never made it to RUNNING state.";
                }, 15000L, 100L);
            }
        }, extensionContext2 -> {
            raftClusterInstance.stop();
        }, new ClusterInstanceParameterResolver(raftClusterInstance));
    }
}
