/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.server.plugins.auth;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.multitenant.metrics.ApiKeyConnectionSensorBuilder;
import io.confluent.kafka.multitenant.utils.Utils;
import io.confluent.kafka.server.plugins.auth.MultiTenantSaslConfigEntry;
import io.confluent.kafka.server.plugins.auth.MultiTenantSaslSecrets;
import io.confluent.kafka.server.plugins.auth.PlainSaslAuthenticator;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.server.BrokerSession;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.SecretsLogFailedException;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.network.PublicCredential;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.multitenant.MultiTenantSecretsStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MultiTenantSaslSecretsStore
implements MultiTenantSecretsStore {
    private static final Map<String, MultiTenantSaslSecretsStore> INSTANCES = new HashMap<String, MultiTenantSaslSecretsStore>();
    private static final Logger LOG = LoggerFactory.getLogger(MultiTenantSaslSecretsStore.class);
    static final String METRICS_GROUP = "tenant-metrics";
    private List<String> multitenantListenerNames = Collections.emptyList();
    private final ObjectMapper objectMapper;
    private String sessionUuid;
    private String topicName;
    private Long topicLoadTimeoutMs;
    private final Map<String, ?> baseClientProperties;
    private final Metrics metrics;
    private KafkaBasedLog<String, String> secretsLog;
    private final ConcurrentHashMap<String, MultiTenantSaslConfigEntry> secretsMap;
    private final Map<String, String> resourceIdToUserId;
    private final Map<String, String> userIdToResourceId;
    private final MultiTenantSaslSecrets secrets;
    private final AtomicReference<State> state;
    private final MetricName apiKeyCountMetricName;
    private final Sensor apiKeyCreationSensor;
    private final Sensor apiKeyDeletionSensor;
    private final Sensor topicLoadTimeSensor;
    final Map<String, Long> lastSequenceId;

    public MultiTenantSaslSecretsStore(Map<String, Object> clientConfigs, Metrics metrics) {
        LOG.trace("Creating MultiTenantSaslSecretsStore");
        this.objectMapper = new ObjectMapper();
        this.baseClientProperties = clientConfigs;
        this.metrics = metrics;
        this.secretsMap = new ConcurrentHashMap();
        this.resourceIdToUserId = new ConcurrentHashMap<String, String>();
        this.userIdToResourceId = new ConcurrentHashMap<String, String>();
        this.secrets = new MultiTenantSaslSecrets(this.secretsMap);
        this.state = new AtomicReference<State>(State.NOT_RUNNING);
        this.lastSequenceId = new HashMap<String, Long>();
        this.apiKeyCountMetricName = metrics.metricName("active-api-key-count", METRICS_GROUP, "The number of active API keys.");
        metrics.addMetric(this.apiKeyCountMetricName, (config, now) -> this.secrets.entries().size());
        this.apiKeyCreationSensor = metrics.sensor("ApiKeyCreation");
        this.apiKeyCreationSensor.add(metrics.metricName("api-key-creation-rate", METRICS_GROUP, "The rate of new API key creation."), (MeasurableStat)new Rate());
        this.apiKeyDeletionSensor = metrics.sensor("ApiKeyDeletion");
        this.apiKeyDeletionSensor.add(metrics.metricName("api-key-deletion-rate", METRICS_GROUP, "The rate of API key deletion."), (MeasurableStat)new Rate());
        this.topicLoadTimeSensor = metrics.sensor("ApiKeyTopicLoadTime");
        this.topicLoadTimeSensor.add(metrics.metricName("api-key-topic-load-time", METRICS_GROUP, "The loading time for the api key topic."), (MeasurableStat)new Max());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void configure(Map<String, ?> config) {
        this.sessionUuid = Utils.getBrokerSessionUuid(config);
        this.multitenantListenerNames = ConfluentConfigs.multitenantListenerNames(config, null);
        this.topicName = this.getTopicName(config);
        this.topicLoadTimeoutMs = this.getTopicLoadTimeout(config);
        this.secretsLog = this.createKafkaBasedLog(config);
        Map<String, MultiTenantSaslSecretsStore> map = INSTANCES;
        synchronized (map) {
            MultiTenantSaslSecretsStore instance = INSTANCES.get(this.sessionUuid);
            if (instance != null) {
                if (this != instance) {
                    throw new UnsupportedOperationException("MultiTenantSaslSecretsStore instance already exists for broker session " + this.sessionUuid);
                }
                LOG.info("Skipping configuring this instance (broker session {}): Already configured.", (Object)this.sessionUuid);
                return;
            }
            INSTANCES.put(this.sessionUuid, this);
        }
        LOG.info("Configured MultiTenantSaslSecretsStore instance (broker session {})", (Object)this.sessionUuid);
    }

    void configure(KafkaBasedLog<String, String> secretsLog, List<String> multitenantListenerNames) {
        LOG.warn("configure(KafkaBasedLog<>) called, shouldn't happen outside tests (Ignore if this is a unittest.)");
        this.topicName = "unused_var-secretsLog-passed-into-ctor";
        this.topicLoadTimeoutMs = ConfluentConfigs.CDC_TOPIC_LOAD_TIMEOUT_MS_DEFAULT;
        this.multitenantListenerNames = multitenantListenerNames;
        this.secretsLog = secretsLog;
    }

    private KafkaBasedLog<String, String> createKafkaBasedLog(Map<String, ?> config) {
        Map<String, Object> consumerProps = this.getConsumerConfig(config);
        consumerProps.put("allow.auto.create.topics", false);
        consumerProps.put("key.deserializer", StringDeserializer.class.getName());
        consumerProps.put("value.deserializer", StringDeserializer.class.getName());
        return new KafkaBasedLog(this.topicName, null, consumerProps, (Callback)new ConsumeCallback(), Time.SYSTEM, null, this.topicLoadTimeoutMs.longValue());
    }

    public Map<Endpoint, CompletableFuture<Void>> start(Collection<Endpoint> endpoints) {
        CompletableFuture<Void> logStartedFuture;
        if (this.state.compareAndSet(State.NOT_RUNNING, State.STARTING)) {
            try {
                logStartedFuture = CompletableFuture.runAsync(() -> this.startLog());
            }
            catch (Exception e2) {
                this.state.set(State.FAILED_TO_START);
                throw new IllegalStateException("Unable to create a future for startLog()", e2);
            }
        } else {
            throw new IllegalStateException("Trying to start a MultiTenantSaslSecretsStore which was already started!");
        }
        return endpoints.stream().collect(Collectors.toMap(Function.identity(), e -> e.listenerName().map(this.multitenantListenerNames::contains).orElse(false) != false ? logStartedFuture : CompletableFuture.completedFuture(null)));
    }

    public void close() {
        LOG.info("Closing Multi-tenant Sasl Secrets Store from topic: {}", (Object)this.topicName);
        if (this.sessionUuid != null) {
            this.close(this.sessionUuid);
        }
        this.metrics.removeMetric(this.apiKeyCountMetricName);
        this.metrics.removeSensor(this.apiKeyCreationSensor.name());
        this.metrics.removeSensor(this.apiKeyDeletionSensor.name());
        this.metrics.removeSensor(this.topicLoadTimeSensor.name());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(String brokerSessionUuid) {
        boolean shouldStop = false;
        Map<String, MultiTenantSaslSecretsStore> map = INSTANCES;
        synchronized (map) {
            MultiTenantSaslSecretsStore instance = INSTANCES.get(brokerSessionUuid);
            if (instance == this) {
                INSTANCES.remove(brokerSessionUuid);
                shouldStop = true;
                LOG.info("Removed {} instance for broker session {}", (Object)this.getClass().getName(), (Object)brokerSessionUuid);
            } else if (instance != null) {
                LOG.info("Closing instance that doesn't match the instance in the static map with the same broker session {} will not remove that instance from the map.", (Object)brokerSessionUuid);
            }
        }
        if (shouldStop && this.state.compareAndSet(State.RUNNING, State.SHUTTING_DOWN)) {
            try {
                this.secretsLog.stop();
            }
            finally {
                this.state.set(State.SHUTDOWN_COMPLETE);
            }
        }
    }

    public MultiTenantSaslSecrets load() {
        return this.state.get() == State.RUNNING ? this.secrets : null;
    }

    private void createSensors(Map<String, MultiTenantSaslConfigEntry> entries) {
        entries.forEach((username, userInfo) -> new ApiKeyConnectionSensorBuilder(this.metrics, PlainSaslAuthenticator.multiTenantPrincipal(username, userInfo)).build());
    }

    private void startLog() {
        try {
            long loadStartNanos = System.nanoTime();
            this.secretsLog.start();
            this.state.set(State.RUNNING);
            long loadTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - loadStartNanos);
            this.topicLoadTimeSensor.record((double)loadTimeMs);
            LOG.info("Started MultiTenantSaslSecretsStore from topic: {} in {} ms", (Object)this.topicName, (Object)loadTimeMs);
            CompletableFuture.runAsync(() -> this.createSensors(this.secrets.entries()));
        }
        catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new SecretsLogFailedException("Unable to start the consumer for MultiTenantSaslSecretsStore", (Throwable)e);
        }
    }

    void read(ConsumerRecord<String, String> record) {
        String key = (String)record.key();
        if (key == null) {
            LOG.error("Record is missing a key (which is a must!). Ignoring this record.");
            return;
        }
        Long sequenceId = Utils.tryParseEventsSequenceId(record);
        if (sequenceId == null) {
            LOG.error("Sequence ID is missing from the headers of record with key {}. Ignoring this record.", (Object)key);
            return;
        }
        this.updateSecrets(record, sequenceId);
        LOG.trace("Finished reading record with sequence id: {}", (Object)sequenceId);
    }

    boolean validateEntries(String key, Map<String, MultiTenantSaslConfigEntry> entries) {
        return entries.size() == 1 && entries.keySet().stream().allMatch(k -> k.equals(key));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void updateSecrets(ConsumerRecord<String, String> record, long currSeqId) {
        String key = (String)record.key();
        String secretsMsg = (String)record.value();
        Long prevSeqId = this.lastSequenceId.get(key);
        if (prevSeqId != null && prevSeqId >= currSeqId) {
            LOG.warn("Ignoring older message for key {} with sequence id: {} (last seen id is {})", new Object[]{key, currSeqId, prevSeqId});
            return;
        }
        try {
            if (secretsMsg != null) {
                this.maybeLogApiKeyUpdate(key, currSeqId, true);
                MultiTenantSaslSecrets parsed = (MultiTenantSaslSecrets)this.objectMapper.readValue(secretsMsg, MultiTenantSaslSecrets.class);
                if (!this.validateEntries(key, parsed.entries())) {
                    throw new IllegalStateException("Invalid secrets message for " + key);
                }
                this.secretsMap.putAll(parsed.entries());
                this.addToUserResourceMap(parsed.entries().values());
                this.apiKeyCreationSensor.record();
                if (this.state.get() == State.RUNNING) {
                    this.createSensors(parsed.entries());
                }
            } else {
                this.maybeLogApiKeyUpdate(key, currSeqId, false);
                MultiTenantSaslConfigEntry deletedEntry = this.secretsMap.remove(key);
                this.removeFromUserResourceMap(deletedEntry);
                this.deleteCredential(key);
                this.apiKeyDeletionSensor.record();
            }
        }
        catch (Exception e) {
            LOG.error("Error handling message for api key: {}, sequence id: {}, partition: {}, timestamp: {}", new Object[]{key, currSeqId, record.partition(), record.timestamp(), e});
        }
        finally {
            this.lastSequenceId.put(key, currSeqId);
        }
    }

    private synchronized void addToUserResourceMap(Collection<MultiTenantSaslConfigEntry> configEntries) {
        try {
            for (MultiTenantSaslConfigEntry config : configEntries) {
                if (config.userResourceId == null) continue;
                this.resourceIdToUserId.put(config.userResourceId(), config.userId());
                this.userIdToResourceId.put(config.userId(), config.userResourceId());
            }
        }
        catch (Exception e) {
            LOG.error("Ran into an exception while adding to user id <-> resource id maps.", (Throwable)e);
        }
    }

    private synchronized void removeFromUserResourceMap(MultiTenantSaslConfigEntry deletedEntry) {
        try {
            Optional<MultiTenantSaslConfigEntry> existingEntry;
            if (deletedEntry != null && !(existingEntry = this.secretsMap.values().stream().filter(multiTenantSaslConfigEntry -> multiTenantSaslConfigEntry.userId.equals(deletedEntry.userId)).findFirst()).isPresent() && deletedEntry.userResourceId != null) {
                this.resourceIdToUserId.remove(deletedEntry.userResourceId());
                this.userIdToResourceId.remove(deletedEntry.userId());
            }
        }
        catch (Exception e) {
            LOG.error("Ran into an exception while deleting from user id <-> resource id maps.", (Throwable)e);
        }
    }

    private void maybeLogApiKeyUpdate(String key, long sequenceId, boolean isUpdate) {
        String apiKeyUpdateLogMessage;
        String string = apiKeyUpdateLogMessage = isUpdate ? "Updating api keys for {} from topic (sequence id: {})" : "Read null value for key {}, deleting from key store (sequence id: {})";
        if (State.RUNNING.equals((Object)this.state.get())) {
            LOG.info(apiKeyUpdateLogMessage, (Object)key, (Object)sequenceId);
        } else {
            LOG.debug(apiKeyUpdateLogMessage, (Object)key, (Object)sequenceId);
        }
    }

    synchronized void deleteCredential(String deletedKey) {
        BrokerSession session;
        BrokerSession brokerSession = session = this.sessionUuid != null ? BrokerSession.session((String)this.sessionUuid) : null;
        if (session != null) {
            LOG.trace("Deleting API key {} for broker session {}", (Object)deletedKey, (Object)this.sessionUuid);
            session.handleCredentialDelete(PublicCredential.saslCredential((String)deletedKey, (String)"PLAIN"));
        } else {
            LOG.warn("Ignoring deleted API key {} because broker session {} is not available.", (Object)deletedKey, (Object)this.sessionUuid);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static MultiTenantSaslSecretsStore getInstance(String brokerSessionUuid) {
        Map<String, MultiTenantSaslSecretsStore> map = INSTANCES;
        synchronized (map) {
            return INSTANCES.get(brokerSessionUuid);
        }
    }

    public synchronized Optional<String> userId(String userResourceId) {
        return Optional.ofNullable(this.resourceIdToUserId.get(userResourceId));
    }

    public synchronized Optional<String> userResourceId(String userId) {
        return Optional.ofNullable(this.userIdToResourceId.get(userId));
    }

    private Map<String, Object> getConsumerConfig(Map<String, ?> config) {
        HashSet configNames = new HashSet(ConsumerConfig.configNames());
        configNames.remove("metric.reporters");
        HashMap<String, Object> clientConfigs = new HashMap<String, Object>(this.baseClientProperties);
        clientConfigs.keySet().retainAll(configNames);
        clientConfigs.put("client.id", String.format("%s-%s-%s", this.topicName, ConfluentConfigs.ClientType.CONSUMER, config.get(KafkaConfig.BrokerIdProp())));
        clientConfigs.put("bootstrap.servers", this.baseClientProperties.get("bootstrap.servers"));
        return clientConfigs;
    }

    private String getTopicName(Map<String, ?> config) {
        String topicFromConfig = (String)config.get("confluent.cdc.api.keys.topic");
        if (topicFromConfig == null || topicFromConfig.isEmpty()) {
            throw new ConfigException("Missing value of config: confluent.cdc.api.keys.topic");
        }
        return topicFromConfig;
    }

    private Long getTopicLoadTimeout(Map<String, ?> config) {
        Long topicLoadTimeoutMs = (Long)config.get("confluent.cdc.api.keys.load.timeout.ms");
        if (topicLoadTimeoutMs == null || topicLoadTimeoutMs <= 0L) {
            throw new ConfigException("Missing value of config: confluent.cdc.api.keys.load.timeout.ms");
        }
        return topicLoadTimeoutMs;
    }

    public static enum State {
        NOT_RUNNING(0),
        STARTING(1),
        RUNNING(2),
        SHUTTING_DOWN(3),
        SHUTDOWN_COMPLETE(4),
        FAILED_TO_START(5);

        private final byte value;

        private State(byte value) {
            this.value = value;
        }

        public byte value() {
            return this.value;
        }
    }

    private class ConsumeCallback
    implements Callback<ConsumerRecord<String, String>> {
        private ConsumeCallback() {
        }

        public void onCompletion(Throwable error, ConsumerRecord<String, String> record) {
            if (error != null) {
                LOG.error("Unexpected error in consumer callback for MultiTenantSaslSecretsStore: ", error);
                return;
            }
            MultiTenantSaslSecretsStore.this.read(record);
        }
    }
}

