/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.test;

import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuthorizer;
import io.confluent.kafka.multitenant.integration.cluster.LogicalClusterUser;
import io.confluent.kafka.multitenant.integration.cluster.PhysicalCluster;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthBearerServerLoginCallbackHandler;
import io.confluent.kafka.server.plugins.auth.oauth.OAuthBearerValidatorCallbackHandler;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Time;

public class IntegrationTestHarness {
    private static final int DEFAULT_BROKERS_IN_PHYSICAL_CLUSTER = 1;
    private PhysicalCluster physicalCluster;
    private final List<KafkaProducer<?, ?>> producers = new ArrayList();
    private final List<KafkaConsumer<?, ?>> consumers = new ArrayList();
    private final List<AdminClient> adminClients = new ArrayList<AdminClient>();
    private final int brokersInPhysicalCluster;
    private final List<String> brokerRacks;
    private final List<String> brokerCells;

    public IntegrationTestHarness() {
        this(1);
    }

    public IntegrationTestHarness(int brokersInPhysicalCluster) {
        this(brokersInPhysicalCluster, Collections.emptyList());
    }

    public IntegrationTestHarness(int brokersInPhysicalCluster, List<String> brokerRacks) {
        this(brokersInPhysicalCluster, brokerRacks, Collections.emptyList());
    }

    public IntegrationTestHarness(int brokersInPhysicalCluster, List<String> brokerRacks, List<String> brokerCells) {
        if (!brokerRacks.isEmpty() && brokerRacks.size() != brokersInPhysicalCluster) {
            throw new IllegalArgumentException("Broker racks should be empty or should be same as number of brokers.");
        }
        if (!brokerCells.isEmpty() && brokerCells.size() != brokersInPhysicalCluster) {
            throw new IllegalArgumentException("Broker cells should be empty or should be same as number of brokers.");
        }
        this.brokersInPhysicalCluster = brokersInPhysicalCluster;
        this.brokerRacks = brokerRacks;
        this.brokerCells = brokerCells;
    }

    public PhysicalCluster start(Properties brokerOverrideProps) {
        return this.start(brokerOverrideProps, Optional.empty(), PhysicalCluster::makeBrokerSuperUser);
    }

    public PhysicalCluster start(Properties brokerOverrideProps, Optional<Time> time, Consumer<PhysicalCluster> configureSecurityForBroker) {
        this.physicalCluster = new PhysicalCluster(this.brokersInPhysicalCluster, this.brokerRacks, this.brokerCells, brokerOverrideProps, time);
        this.physicalCluster.start(configureSecurityForBroker);
        return this.physicalCluster;
    }

    public void shutdownBrokers() {
        if (this.physicalCluster != null) {
            this.physicalCluster.kafkaCluster().shutdownBrokers();
        }
    }

    public void startBrokers() {
        if (this.physicalCluster != null) {
            this.physicalCluster.kafkaCluster().startBrokersAfterShutdown();
        }
    }

    public void shutdown() {
        this.producers.forEach(KafkaProducer::close);
        this.consumers.forEach(KafkaConsumer::close);
        this.adminClients.forEach(Admin::close);
        if (this.physicalCluster != null) {
            this.physicalCluster.shutdown();
        }
    }

    public String zkConnect() {
        return this.physicalCluster.kafkaCluster().zkConnect();
    }

    public KafkaProducer<String, String> createProducer(LogicalClusterUser user, SecurityProtocol securityProtocol) {
        KafkaProducer<String, String> producer = KafkaTestUtils.createProducer(this.physicalCluster.bootstrapServers(), securityProtocol, ScramMechanism.SCRAM_SHA_256.mechanismName(), user.saslJaasConfig());
        this.producers.add(producer);
        return producer;
    }

    public KafkaConsumer<String, String> createConsumer(LogicalClusterUser user, String consumerGroup, SecurityProtocol securityProtocol) {
        KafkaConsumer<String, String> consumer = KafkaTestUtils.createConsumer(this.physicalCluster.bootstrapServers(), securityProtocol, ScramMechanism.SCRAM_SHA_256.mechanismName(), user.saslJaasConfig(), consumerGroup);
        this.consumers.add(consumer);
        return consumer;
    }

    public AdminClient createAdminClient(LogicalClusterUser user) {
        AdminClient adminClient = KafkaTestUtils.createAdminClient(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, ScramMechanism.SCRAM_SHA_256.mechanismName(), user.saslJaasConfig());
        this.adminClients.add(adminClient);
        return adminClient;
    }

    public AdminClient createOAuthAdminClient(String jaasConfig, Properties properties) {
        AdminClient adminClient = KafkaTestUtils.createAdminClient(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, "OAUTHBEARER", jaasConfig, properties);
        this.adminClients.add(adminClient);
        return adminClient;
    }

    public AdminClient createSSLOAuthAdminClient(String jaasConfig, Properties properties, Map<String, Object> certStore, String sslEngineFactoryPath) {
        if (certStore != null) {
            properties.putAll(certStore);
        }
        if (sslEngineFactoryPath != null) {
            properties.put("ssl.engine.factory.class", sslEngineFactoryPath);
        }
        AdminClient adminClient = KafkaTestUtils.createAdminClient(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_SSL, "OAUTHBEARER", jaasConfig, properties);
        this.adminClients.add(adminClient);
        return adminClient;
    }

    public AdminClient createPlainAuthAdminClient(String jaasConfig) {
        return this.createPlainAuthAdminClient(jaasConfig, new Properties());
    }

    public AdminClient createPlainAuthAdminClient(String jaasConfig, Properties props) {
        AdminClient adminClient = KafkaTestUtils.createAdminClient(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_PLAINTEXT, "PLAIN", jaasConfig, props);
        this.adminClients.add(adminClient);
        return adminClient;
    }

    public AdminClient createSSLAuthAdminClient(String jaasConfig, Map<String, Object> certStore) {
        return this.createSSLAuthAdminClient(jaasConfig, certStore, null, null);
    }

    public AdminClient createSSLAuthAdminClient(String jaasConfig, Map<String, Object> certStore, Class<? extends HostResolver> hostResolverClass) {
        return this.createSSLAuthAdminClient(jaasConfig, certStore, null, hostResolverClass);
    }

    public AdminClient createSSLAuthAdminClient(String jaasConfig, Map<String, Object> certStore, String sslEngineFactoryPath) {
        return this.createSSLAuthAdminClient(jaasConfig, certStore, sslEngineFactoryPath, null);
    }

    public AdminClient createSSLAuthAdminClient(String jaasConfig, Map<String, Object> certStore, String sslEngineFactoryPath, Class<? extends HostResolver> hostResolverClass) {
        Properties props = new Properties();
        props.put("ssl.truststore.location", certStore.get("ssl.truststore.location"));
        props.put("ssl.truststore.password", ((Password)certStore.get("ssl.truststore.password")).value());
        props.put("security.protocol", "SSL");
        if (sslEngineFactoryPath != null) {
            props.put("ssl.engine.factory.class", sslEngineFactoryPath);
        }
        if (hostResolverClass != null) {
            props.put("host.resolver.class", hostResolverClass.getName());
        }
        AdminClient adminClient = KafkaTestUtils.createAdminClient(this.physicalCluster.bootstrapServers(), SecurityProtocol.SASL_SSL, "PLAIN", jaasConfig, props);
        this.adminClients.add(adminClient);
        return adminClient;
    }

    public void produceConsume(LogicalClusterUser producerUser, LogicalClusterUser consumerUser, String topic, String consumerGroup, int firstMessageIndex, SecurityProtocol securityProtocol) throws Throwable {
        String prefixedTopic = producerUser.tenantPrefix() + topic;
        this.physicalCluster.kafkaCluster().createTopic(prefixedTopic, 2, 1);
        try (KafkaProducer<String, String> producer = this.createProducer(producerUser, securityProtocol);){
            KafkaTestUtils.sendRecords(producer, topic, firstMessageIndex, 10);
        }
        var9_9 = null;
        try (KafkaConsumer<String, String> consumer = this.createConsumer(consumerUser, consumerGroup, securityProtocol);){
            KafkaTestUtils.consumeRecords(consumer, topic, firstMessageIndex, 10);
        }
        catch (Throwable throwable) {
            var9_9 = throwable;
            throw throwable;
        }
    }

    public void produceConsume(LogicalClusterUser producerUser, LogicalClusterUser consumerUser, String topic, String consumerGroup, int firstMessageIndex) throws Throwable {
        this.produceConsume(producerUser, consumerUser, topic, consumerGroup, firstMessageIndex, SecurityProtocol.SASL_PLAINTEXT);
    }

    public static SSLEngine setSniHostName(SSLEngine sslEngine, String sniHostName) {
        SSLParameters sslParameters = sslEngine.getSSLParameters();
        List<SNIServerName> sniServerNameList = Collections.singletonList(new SNIHostName(sniHostName));
        sslParameters.setServerNames(sniServerNameList);
        sslEngine.setSSLParameters(sslParameters);
        return sslEngine;
    }

    public static String clientPlainJaasConfig(String username, String password) {
        return "org.apache.kafka.common.security.plain.PlainLoginModule required  username=\"" + username + "\"  password=\"" + password + "\";";
    }

    public static Properties defaultOAuthBrokerProps() {
        Properties props = new Properties();
        props.put("sasl.enabled.mechanisms", Collections.singletonList("OAUTHBEARER"));
        props.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        props.put("confluent.security.event.logger.authentication.enable", "true");
        props.put(KafkaConfig.AuthorizerClassNameProp(), MultiTenantAuthorizer.class.getName());
        props.put("confluent.security.event.logger.multitenant.enable", "true");
        props.put("ce.broker.plugins.test.audit.provider.config", "TEST");
        props.put("listener.name.external.oauthbearer.sasl.login.callback.handler.class", OAuthBearerServerLoginCallbackHandler.class.getName());
        props.put("listener.name.external.oauthbearer.sasl.server.callback.handler.class", OAuthBearerValidatorCallbackHandler.class.getName());
        props.put("listener.name.external.principal.builder.class", MultiTenantPrincipalBuilder.class.getName());
        return props;
    }
}

