/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.integration.cluster;

import io.confluent.kafka.clients.CloudAdmin;
import io.confluent.kafka.multitenant.BasePhysicalClusterMetadata;
import io.confluent.kafka.multitenant.MultiTenantInterceptor;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.MultiTenantPrincipalBuilder;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.TopicBasedPhysicalClusterMetadata;
import io.confluent.kafka.multitenant.integration.cluster.LogicalCluster;
import io.confluent.kafka.multitenant.integration.cluster.UserMetadata;
import io.confluent.kafka.multitenant.integration.test.DummyMultitenantMetadata;
import io.confluent.kafka.server.plugins.auth.DefaultUserMetaDataStore;
import io.confluent.kafka.test.cluster.EmbeddedKafkaCluster;
import io.confluent.kafka.test.utils.AclCommandBuilder;
import io.confluent.kafka.test.utils.ClientSecuritySpec;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import io.confluent.kafka.traffic.TopicBasedTrafficNetworkIdRoutesStore;
import java.security.NoSuchAlgorithmException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.BrokerSession;
import kafka.server.KafkaConfig;
import kafka.utils.EmptyTestInfo;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.network.PublicCredential;
import org.apache.kafka.common.security.auth.AuthenticationContext;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.CredentialCache;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.security.scram.internals.ScramServerCallbackHandler;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PhysicalCluster {
    private static final Logger log = LoggerFactory.getLogger(PhysicalCluster.class);
    public static final KafkaPrincipal BROKER_PRINCIPAL = new KafkaPrincipal("User", "broker");
    private static final Pattern SASL_USERNAME_PATTERN = Pattern.compile("(?<clusterId>[^_]*)_(?<apiKey>.*)");
    private static Map<String, PhysicalCluster> instances = new ConcurrentHashMap<String, PhysicalCluster>();
    private final String physicalClusterId = String.valueOf(System.identityHashCode(this));
    private final EmbeddedKafkaCluster kafkaCluster;
    private AdminClient superAdminClient;
    private CloudAdmin cloudAdmin;
    private final Random random;
    private final Map<Integer, UserMetadata> usersById;
    private final Map<String, UserMetadata> usersByApiKey;
    private final Map<String, LogicalCluster> logicalClusters;
    private final int numberOfBrokers;
    private final List<String> brokerRacks;
    private final List<String> brokerCells;
    private boolean isBrokerSuperUser = true;

    public PhysicalCluster(int brokers) {
        this(brokers, Collections.emptyList(), Optional.empty());
    }

    public PhysicalCluster(int brokers, List<String> brokerRacks, Optional<Time> time) {
        this(brokers, brokerRacks, Collections.emptyList(), time);
    }

    public PhysicalCluster(int brokers, List<String> brokerRacks, List<String> brokerCells, Optional<Time> time) {
        this(brokers, brokerRacks, brokerCells, time, (TestInfo)new EmptyTestInfo());
    }

    public PhysicalCluster(int brokers, List<String> brokerRacks, List<String> brokerCells, Optional<Time> time, TestInfo testInfo) {
        this.kafkaCluster = new EmbeddedKafkaCluster(time.orElse((Time)new MockTime()), testInfo);
        this.random = new Random();
        this.usersById = new HashMap<Integer, UserMetadata>();
        this.usersByApiKey = new ConcurrentHashMap<String, UserMetadata>();
        this.logicalClusters = new HashMap<String, LogicalCluster>();
        this.numberOfBrokers = brokers;
        this.brokerRacks = brokerRacks;
        this.brokerCells = brokerCells;
        instances.put(this.physicalClusterId, this);
    }

    public void disableBrokerSuperUser() {
        if (this.isKRaft()) {
            throw new IllegalArgumentException("KRaft requires broker principal to be a super user");
        }
        this.isBrokerSuperUser = false;
    }

    private Properties commonConfigs(Set<String> additionalSuperUsers) {
        Properties commonConfigs = new Properties();
        commonConfigs.put("principal.builder.class", MultiTenantScramPrincipalBuilder.class.getName());
        commonConfigs.setProperty("physical.cluster.id", this.physicalClusterId);
        commonConfigs.setProperty(KafkaConfig.InterBrokerListenerNameProp(), "INTERNAL");
        if (this.isKRaft()) {
            commonConfigs.setProperty(KafkaConfig.ControllerListenerNamesProp(), "CONTROLLER");
        }
        HashSet<String> allSuperUsers = new HashSet<String>(additionalSuperUsers);
        if (this.isBrokerSuperUser) {
            allSuperUsers.add(BROKER_PRINCIPAL.toString());
        }
        if (!allSuperUsers.isEmpty()) {
            commonConfigs.setProperty(AclAuthorizer.SuperUsersProp(), String.join((CharSequence)";", allSuperUsers));
        }
        return commonConfigs;
    }

    public synchronized void startQuorum(Properties controllerOverrides) {
        this.startQuorum(controllerOverrides, Collections.emptySet());
    }

    public synchronized void startQuorum(Properties controllerOverrides, Set<String> additionalSuperUsers) {
        if (this.isKRaft()) {
            Properties controllerConfigs = this.commonConfigs(additionalSuperUsers);
            controllerConfigs.setProperty(KafkaConfig.ListenersProp(), "CONTROLLER://localhost:0");
            controllerConfigs.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT");
            controllerConfigs.putAll((Map<?, ?>)controllerOverrides);
            log.debug("Initializing kraft controller quorum with config {}", (Object)controllerConfigs);
            this.kafkaCluster.startQuorum(controllerConfigs);
        } else {
            this.kafkaCluster.startQuorum();
        }
    }

    public synchronized void startBrokers(Properties brokerOverrides) {
        this.startBrokers(brokerOverrides, Collections.emptySet());
    }

    public synchronized void startBrokers(Properties brokerOverrides, Set<String> additionalSuperUsers) {
        Properties brokerConfigs = this.commonConfigs(additionalSuperUsers);
        brokerConfigs.setProperty(KafkaConfig.ListenersProp(), "INTERNAL://localhost:0,EXTERNAL://localhost:0");
        brokerConfigs.setProperty(KafkaConfig.SaslEnabledMechanismsProp(), "SCRAM-SHA-256");
        brokerConfigs.setProperty("listener.name.external.scram-sha-256.sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required;");
        brokerConfigs.setProperty("listener.name.external.broker.interceptor.class", MultiTenantInterceptor.class.getName());
        brokerConfigs.setProperty("multitenant.metadata.class", DummyMultitenantMetadata.class.getName());
        brokerConfigs.setProperty(KafkaConfig.LogCleanerDedupeBufferSizeProp(), "2097152");
        brokerConfigs.setProperty(KafkaConfig.NumNetworkThreadsProp(), "2");
        brokerConfigs.setProperty(KafkaConfig.NumIoThreadsProp(), "3");
        brokerConfigs.setProperty(KafkaConfig.BackgroundThreadsProp(), "2");
        brokerConfigs.setProperty(KafkaConfig.GroupMetadataLoadThreadsProp(), "1");
        if (this.isKRaft()) {
            brokerConfigs.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT");
            KRaftScramIntegrationTestCallbackHandler.initialize();
            brokerConfigs.setProperty("listener.name.external.scram-sha-256.sasl.server.callback.handler.class", KRaftScramIntegrationTestCallbackHandler.class.getName());
        } else {
            brokerConfigs.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT");
        }
        brokerConfigs.putAll((Map<?, ?>)brokerOverrides);
        log.debug("Initiating Kafka cluster startup with broker config {}", (Object)brokerConfigs);
        int brokerIdStart = Integer.parseInt(brokerConfigs.getOrDefault((Object)KafkaConfig.BrokerIdProp(), "0").toString());
        for (int i = 0; i < this.numberOfBrokers; ++i) {
            if (!this.brokerRacks.isEmpty()) {
                brokerConfigs.put(KafkaConfig.RackProp(), this.brokerRacks.get(i));
            }
            if (!this.brokerCells.isEmpty()) {
                brokerConfigs.put(KafkaConfig.BrokerTagsProp() + "." + "confluent.cell", this.brokerCells.get(i));
            }
            this.kafkaCluster.startBroker(brokerIdStart + i, brokerConfigs);
        }
    }

    public List<BasePhysicalClusterMetadata> clusterMetadataInstances() {
        List<String> brokerSessionUuids = this.brokerSessionUuids();
        List<BasePhysicalClusterMetadata> instances = brokerSessionUuids.stream().map(BasePhysicalClusterMetadata::getInstance).collect(Collectors.toList());
        instances.forEach(instance -> Assertions.assertTrue((boolean)(instance instanceof TopicBasedPhysicalClusterMetadata), (String)"Expected valid instance of TopicBasedPhysicalClusterMetadata for all broker sessions"));
        return instances;
    }

    public List<TopicBasedTrafficNetworkIdRoutesStore> networkIdRoutesStoreInstances() {
        List<String> brokerSessionUuids = this.brokerSessionUuids();
        List<TopicBasedTrafficNetworkIdRoutesStore> instances = brokerSessionUuids.stream().map(TopicBasedTrafficNetworkIdRoutesStore::getInstance).collect(Collectors.toList());
        instances.forEach(instance -> Assertions.assertTrue((boolean)(instance instanceof TopicBasedTrafficNetworkIdRoutesStore), (String)("Expected valid instance of TopicBasedTrafficNetworkIdRoutesStore for all broker sessions but got " + instance)));
        return instances;
    }

    public List<DefaultUserMetaDataStore> userMetaDataStoreInstances() {
        List<String> brokerSessionUuids = this.brokerSessionUuids();
        if (this.isKRaft()) {
            String kraftControllerSessionUuid = this.kafkaCluster().kraftController().config().values().get(KafkaConfig.BrokerSessionUuidProp()).toString();
            brokerSessionUuids.add(kraftControllerSessionUuid);
        }
        List<DefaultUserMetaDataStore> instances = brokerSessionUuids.stream().map(DefaultUserMetaDataStore::getInstance).collect(Collectors.toList());
        instances.forEach(instance -> Assertions.assertTrue((boolean)(instance instanceof DefaultUserMetaDataStore), (String)("Expected valid instance of DefaultUserMetaDataStore for all broker sessions but got " + instance)));
        return instances;
    }

    private List<String> brokerSessionUuids() {
        List<String> brokerSessionUuids = this.kafkaCluster().kafkaBrokers().stream().map(broker -> {
            Object cfgVal = broker.config().values().get(KafkaConfig.BrokerSessionUuidProp());
            return cfgVal == null ? "" : cfgVal.toString();
        }).distinct().collect(Collectors.toList());
        Assertions.assertEquals((int)this.numberOfBrokers, (int)brokerSessionUuids.size(), (String)"Expect each broker to have unique session UUID.");
        return brokerSessionUuids;
    }

    public synchronized void shutdown() {
        try {
            if (this.superAdminClient != null) {
                this.superAdminClient.close();
            }
            if (this.cloudAdmin != null) {
                this.cloudAdmin.close();
            }
            this.kafkaCluster.shutdown();
        }
        finally {
            instances.remove(this.physicalClusterId);
        }
    }

    public String bootstrapServers() {
        return this.kafkaCluster.bootstrapServers();
    }

    public String bootstrapServers(String listenerName) {
        return this.kafkaCluster.bootstrapServers(listenerName);
    }

    public EmbeddedKafkaCluster kafkaCluster() {
        return this.kafkaCluster;
    }

    public synchronized LogicalCluster createLogicalCluster(String clusterId, int adminUserId, Integer ... serviceIds) {
        return this.createLogicalCluster(clusterId, null, null, adminUserId, serviceIds);
    }

    public synchronized LogicalCluster createLogicalCluster(String clusterId, String orgId, String envId, int adminUserId, Integer ... serviceIds) {
        if (this.logicalClusters.containsKey(clusterId)) {
            throw new IllegalArgumentException("Logical cluster " + clusterId + " already exists");
        }
        UserMetadata adminUser = this.getOrCreateUser(adminUserId, true);
        LogicalCluster logicalCluster = new LogicalCluster(this, clusterId, orgId, envId, adminUser);
        this.logicalClusters.put(clusterId, logicalCluster);
        for (Integer userId : serviceIds) {
            logicalCluster.addUser(this.getOrCreateUser(userId, false));
        }
        return logicalCluster;
    }

    public synchronized UserMetadata getOrCreateUser(int userId, boolean isSuperUser) {
        return this.getOrCreateUser(userId, isSuperUser, !isSuperUser);
    }

    public synchronized UserMetadata getOrCreateUser(int userId, boolean isSuperUser, boolean serviceAccount) {
        UserMetadata userMetadata = this.usersById.get(userId);
        if (userMetadata != null) {
            return userMetadata;
        }
        String apiKey = "APIKEY" + userId;
        String apiSecret = "APISECRET-" + this.random.nextLong();
        userMetadata = new UserMetadata(userId, apiKey, apiSecret, isSuperUser, serviceAccount);
        this.usersById.put(userId, userMetadata);
        this.usersByApiKey.put(apiKey, userMetadata);
        return userMetadata;
    }

    public MultiTenantPrincipal principal(String saslUserName) {
        Matcher matcher = SASL_USERNAME_PATTERN.matcher(saslUserName);
        if (!matcher.matches()) {
            throw new IllegalArgumentException("Invalid SASL user name " + saslUserName);
        }
        String apiKey = matcher.group("apiKey");
        UserMetadata user = this.usersByApiKey.get(apiKey);
        if (user == null) {
            throw new IllegalArgumentException("APIKey not found " + apiKey);
        }
        String logicalClusterId = matcher.group("clusterId");
        TenantMetadata tenantMetadata = new TenantMetadata.Builder(logicalClusterId, user.userResourceId()).serviceAccount(user.isServiceAccount()).healthcheckTenant(user.isSuperUser()).apiKeyAuthenticated(false).build();
        tenantMetadata.updateOrgProperties(this.logicalClusters.get(logicalClusterId).orgId(), this.logicalClusters.get(logicalClusterId).envId());
        return new MultiTenantPrincipal(String.valueOf(user.userId()), saslUserName, tenantMetadata);
    }

    public ClientSecuritySpec internalListenerClientSpec() {
        return ClientSecuritySpec.plaintext(this.kafkaCluster.bootstrapServers("INTERNAL"));
    }

    public AclCommandBuilder newAclCommand() {
        AclCommandBuilder bldr = new AclCommandBuilder();
        if (this.isKRaft()) {
            bldr.setClientSecuritySpec(this.internalListenerClientSpec());
        } else {
            bldr.setZookeeperConnect(this.kafkaCluster.zkConnect());
        }
        return bldr;
    }

    public AclCommandBuilder newAclCommandWithExternalListener(String jaasConfig) {
        AclCommandBuilder bldr = new AclCommandBuilder().setClientSecuritySpec(new ClientSecuritySpec(this.bootstrapServers("EXTERNAL"), SecurityProtocol.SASL_PLAINTEXT, "PLAIN", jaasConfig));
        return bldr;
    }

    public synchronized AdminClient superAdminClient() {
        if (this.superAdminClient == null) {
            this.superAdminClient = KafkaTestUtils.createAdminClient(this.kafkaCluster.bootstrapServers("INTERNAL"), SecurityProtocol.PLAINTEXT, "", "");
        }
        return this.superAdminClient;
    }

    public synchronized CloudAdmin superConfluentAdmin() {
        if (this.cloudAdmin == null) {
            this.cloudAdmin = KafkaTestUtils.createCloudAdmin(this.kafkaCluster.bootstrapServers("INTERNAL"), SecurityProtocol.PLAINTEXT, "", "");
        }
        return this.cloudAdmin;
    }

    public boolean isKRaft() {
        return this.kafkaCluster.isKRaft();
    }

    public void addScramUser(String username, String password) {
        if (this.isKRaft()) {
            try {
                ScramCredential credential = new ScramFormatter(ScramMechanism.SCRAM_SHA_256).generateCredential(password, 4096);
                KRaftScramIntegrationTestCallbackHandler.addCredential(username, credential);
            }
            catch (NoSuchAlgorithmException e) {
                throw new RuntimeException(e);
            }
        } else {
            SecurityTestUtils.createScramUser(this.kafkaCluster(), username, password);
        }
    }

    public void deleteScramUser(String username) {
        PublicCredential credential = PublicCredential.saslCredential((String)username, (String)ScramMechanism.SCRAM_SHA_256.mechanismName());
        if (this.isKRaft()) {
            KRaftScramIntegrationTestCallbackHandler.removeCredential(username);
        } else {
            SecurityTestUtils.deleteScramUser(this.kafkaCluster, username);
        }
        this.kafkaCluster.kafkaBrokers().forEach(server -> BrokerSession.session((String)server.config().brokerSessionUuid()).handleCredentialDelete(credential));
    }

    public static class KRaftScramIntegrationTestCallbackHandler
    extends ScramServerCallbackHandler {
        private static CredentialCache.Cache<ScramCredential> credentialCache;

        static void initialize() {
            if (credentialCache == null) {
                credentialCache = new CredentialCache.Cache(ScramCredential.class);
            }
        }

        static void addCredential(String username, ScramCredential credential) {
            credentialCache.put(username, (Object)credential);
        }

        static void removeCredential(String username) {
            credentialCache.remove(username);
        }

        public KRaftScramIntegrationTestCallbackHandler() {
            super(credentialCache, null);
        }
    }

    public static class MultiTenantScramPrincipalBuilder
    extends MultiTenantPrincipalBuilder
    implements Configurable {
        PhysicalCluster physicalCluster;

        public void configure(Map<String, ?> configs) {
            this.physicalCluster = (PhysicalCluster)instances.get((String)configs.get("physical.cluster.id"));
            Assertions.assertNotNull((Object)this.physicalCluster, (String)"Physical cluster not found");
        }

        public KafkaPrincipal build(AuthenticationContext context) {
            if (context.securityProtocol() == SecurityProtocol.SASL_PLAINTEXT) {
                SaslAuthenticationContext saslContext = (SaslAuthenticationContext)context;
                String authzId = saslContext.server().getAuthorizationID();
                return this.physicalCluster.principal(authzId);
            }
            if (context.securityProtocol() == SecurityProtocol.PLAINTEXT) {
                return BROKER_PRINCIPAL;
            }
            return super.build(context);
        }
    }
}

