package kafka.test.junit;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.network.SocketServer;
import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.test.ClusterConfig;
import kafka.test.ClusterInstance;
import kafka.testkit.KafkaClusterTestKit;
import kafka.testkit.TestKitNodes;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;

/* loaded from: input_file:kafka/test/junit/RaftClusterInvocationContext.class */
public class RaftClusterInvocationContext implements TestTemplateInvocationContext {
    private final ClusterConfig clusterConfig;
    private final AtomicReference<KafkaClusterTestKit> clusterReference = new AtomicReference<>();

    /* loaded from: input_file:kafka/test/junit/RaftClusterInvocationContext$RaftClusterInstance.class */
    public static class RaftClusterInstance implements ClusterInstance {
        private final AtomicReference<KafkaClusterTestKit> clusterReference;
        private final ClusterConfig clusterConfig;
        final AtomicBoolean started = new AtomicBoolean(false);
        final AtomicBoolean stopped = new AtomicBoolean(false);
        private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();

        RaftClusterInstance(AtomicReference<KafkaClusterTestKit> atomicReference, ClusterConfig clusterConfig) {
            this.clusterReference = atomicReference;
            this.clusterConfig = clusterConfig;
        }

        @Override // kafka.test.ClusterInstance
        public String bootstrapServers() {
            return this.clusterReference.get().clientProperties().getProperty("bootstrap.servers");
        }

        @Override // kafka.test.ClusterInstance
        public Collection<SocketServer> brokerSocketServers() {
            return (Collection) brokers().map((v0) -> {
                return v0.socketServer();
            }).collect(Collectors.toList());
        }

        @Override // kafka.test.ClusterInstance
        public ListenerName clientListener() {
            return ListenerName.normalised("EXTERNAL");
        }

        @Override // kafka.test.ClusterInstance
        public Collection<SocketServer> controllerSocketServers() {
            return (Collection) controllers().map((v0) -> {
                return v0.socketServer();
            }).collect(Collectors.toList());
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer anyBrokerSocketServer() {
            return (SocketServer) brokers().map((v0) -> {
                return v0.socketServer();
            }).findFirst().orElseThrow(() -> {
                return new RuntimeException("No broker SocketServers found");
            });
        }

        @Override // kafka.test.ClusterInstance
        public SocketServer anyControllerSocketServer() {
            return (SocketServer) controllers().map((v0) -> {
                return v0.socketServer();
            }).findFirst().orElseThrow(() -> {
                return new RuntimeException("No controller SocketServers found");
            });
        }

        @Override // kafka.test.ClusterInstance
        public ClusterInstance.ClusterType clusterType() {
            return ClusterInstance.ClusterType.RAFT;
        }

        @Override // kafka.test.ClusterInstance
        public ClusterConfig config() {
            return this.clusterConfig;
        }

        @Override // kafka.test.ClusterInstance
        public KafkaClusterTestKit getUnderlying() {
            return this.clusterReference.get();
        }

        @Override // kafka.test.ClusterInstance
        public Admin createAdminClient(Properties properties) {
            Admin create = Admin.create(this.clusterReference.get().clientProperties());
            this.admins.add(create);
            return create;
        }

        @Override // kafka.test.ClusterInstance
        public void start() {
            if (this.started.compareAndSet(false, true)) {
                try {
                    this.clusterReference.get().startup();
                } catch (Exception e) {
                    throw new RuntimeException("Failed to start Raft server", e);
                }
            }
        }

        @Override // kafka.test.ClusterInstance
        public void stop() {
            if (this.stopped.compareAndSet(false, true)) {
                this.admins.forEach(admin -> {
                    Utils.closeQuietly(admin, "admin");
                });
                Utils.closeQuietly(this.clusterReference.get(), "cluster");
            }
        }

        @Override // kafka.test.ClusterInstance
        public void shutdownBroker(int i) {
            findBrokerOrThrow(i).shutdown();
        }

        @Override // kafka.test.ClusterInstance
        public void startBroker(int i) {
            findBrokerOrThrow(i).startup();
        }

        @Override // kafka.test.ClusterInstance
        public void waitForReadyBrokers() throws InterruptedException {
            try {
                this.clusterReference.get().waitForReadyBrokers();
            } catch (ExecutionException e) {
                throw new AssertionError("Failed while waiting for brokers to become ready", e);
            }
        }

        @Override // kafka.test.ClusterInstance
        public void rollingBrokerRestart() {
            throw new UnsupportedOperationException("Restarting Raft servers is not yet supported.");
        }

        private BrokerServer findBrokerOrThrow(int i) {
            return (BrokerServer) Optional.ofNullable(this.clusterReference.get().brokers().get(Integer.valueOf(i))).orElseThrow(() -> {
                return new IllegalArgumentException("Unknown brokerId " + i);
            });
        }

        private Stream<BrokerServer> brokers() {
            return this.clusterReference.get().brokers().values().stream();
        }

        private Stream<ControllerServer> controllers() {
            return this.clusterReference.get().controllers().values().stream();
        }
    }

    public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
        this.clusterConfig = clusterConfig;
    }

    public String getDisplayName(int i) {
        return String.format("[%d] Type=Raft, %s", Integer.valueOf(i), (String) this.clusterConfig.nameTags().entrySet().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(", ")));
    }

    public List<Extension> getAdditionalExtensions() {
        RaftClusterInstance raftClusterInstance = new RaftClusterInstance(this.clusterReference, this.clusterConfig);
        return Arrays.asList(extensionContext -> {
            TestKitNodes build = new TestKitNodes.Builder().setNumBrokerNodes(this.clusterConfig.numBrokers()).setNumControllerNodes(this.clusterConfig.numControllers()).build();
            build.brokerNodes().forEach((num, brokerNode) -> {
                this.clusterConfig.brokerServerProperties(num.intValue()).forEach((obj, obj2) -> {
                    brokerNode.propertyOverrides().put(obj.toString(), obj2.toString());
                });
            });
            KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(build);
            this.clusterConfig.serverProperties().forEach((obj, obj2) -> {
                builder.setConfigProp(obj.toString(), obj2.toString());
            });
            KafkaClusterTestKit build2 = builder.build();
            this.clusterReference.set(build2);
            build2.format();
            build2.startup();
            TestUtils.waitUntilTrue(() -> {
                return Boolean.valueOf(build2.brokers().get(0).brokerState() == BrokerState.RUNNING);
            }, () -> {
                return "Broker never made it to RUNNING state.";
            }, 15000L, 100L);
        }, extensionContext2 -> {
            raftClusterInstance.stop();
        }, new ClusterInstanceParameterResolver(raftClusterInstance), new GenericParameterResolver(this.clusterConfig, ClusterConfig.class));
    }
}
