/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.quota;

import com.google.protobuf.InvalidProtocolBufferException;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.multitenant.utils.Utils;
import io.confluent.protobuf.cloud.events.v1.ClientQuotaKey;
import io.confluent.protobuf.cloud.events.v1.ClientQuotaValue;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.apache.kafka.server.quota.MultiTenantQuotaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TenantClientQuotaConsumer
implements MultiTenantQuotaConsumer {
    private static final Map<String, TenantClientQuotaConsumer> INSTANCES = new HashMap<String, TenantClientQuotaConsumer>();
    private static final Logger LOG = LoggerFactory.getLogger(TenantClientQuotaConsumer.class);
    static final String DEFAULT_TENANT_QUOTA_KEY = "<default>";
    final AtomicReference<State> state = new AtomicReference<State>(State.NOT_STARTED);
    private final Time time;
    private String sessionUuid;
    private final Map<String, ?> interBrokerClientConfig;
    private final Map<String, Map<String, TenantCacheEntry>> tenantsCache = new ConcurrentHashMap<String, Map<String, TenantCacheEntry>>();
    private KafkaBasedLog<byte[], byte[]> clientQuotasLog;

    public TenantClientQuotaConsumer(Map<String, ?> interBrokerClientConfig, Metrics metrics) {
        this(interBrokerClientConfig, metrics, Time.SYSTEM);
    }

    public TenantClientQuotaConsumer(Map<String, ?> interBrokerClientConfig, Metrics metrics, Time time) {
        this.time = time;
        this.interBrokerClientConfig = interBrokerClientConfig;
    }

    private boolean checkAndSetEnabledState(Map<String, ?> configs) {
        if (!this.state.get().equals((Object)State.NOT_STARTED)) {
            throw new IllegalStateException("checkAndSetEnabledState called in a non-starting state. Ignoring");
        }
        Boolean enabled = (Boolean)configs.get("confluent.cdc.client.quotas.enable");
        if (enabled == null || !enabled.booleanValue()) {
            LOG.info("Loading client quotas from the sync pipelines is disabled in config {}", (Object)"confluent.cdc.client.quotas.enable");
            this.state.set(State.NOT_ENABLED);
            return false;
        }
        return true;
    }

    public void configure(KafkaBasedLog<byte[], byte[]> log, String sessionUuid) {
        LOG.warn("configure(KafkaBasedLog<>, ...) called, which should only happen in tests (ignore if this is one)");
        this.sessionUuid = sessionUuid;
        this.addInstance(sessionUuid);
        this.clientQuotasLog = log;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean addInstance(String sessionUuid) {
        Map<String, TenantClientQuotaConsumer> map = INSTANCES;
        synchronized (map) {
            TenantClientQuotaConsumer instance = INSTANCES.get(sessionUuid);
            if (instance == null) {
                INSTANCES.put(sessionUuid, this);
                LOG.info("Configured an instance for broker session {}", (Object)sessionUuid);
                return true;
            }
            if (this != instance) {
                throw new IllegalStateException("Another instance already exists for broker session " + sessionUuid);
            }
            return false;
        }
    }

    public void configure(Map<String, ?> configs) {
        this.sessionUuid = Utils.getBrokerSessionUuid(configs);
        if (!this.addInstance(this.sessionUuid)) {
            LOG.info("Skipping configuring this instance is it is already configured for broker session {}", (Object)this.sessionUuid);
            return;
        }
        if (!this.checkAndSetEnabledState(configs)) {
            return;
        }
        this.clientQuotasLog = this.configureConsumer(configs, this.interBrokerClientConfig);
    }

    private KafkaBasedLog<byte[], byte[]> configureConsumer(Map<String, ?> configs, Map<String, ?> interBrokerClientConfigs) {
        State s = this.state.get();
        if (!s.equals((Object)State.NOT_STARTED)) {
            throw new IllegalStateException("configureConsumer called in a state it can't start in: " + (Object)((Object)s));
        }
        String topic = (String)configs.get("confluent.cdc.client.quotas.topic.name");
        if (topic == null || topic.isEmpty()) {
            throw new ConfigException("Value for config confluent.cdc.client.quotas.topic.name can not be empty when client quotas are enabled");
        }
        String clientId = String.format("%s-%s-%s", topic, ConfluentConfigs.ClientType.CONSUMER, configs.get(KafkaConfig.BrokerIdProp()));
        Long timeoutMs = (Long)configs.get("confluent.cdc.api.keys.load.timeout.ms");
        if (timeoutMs == null || timeoutMs <= 0L) {
            throw new ConfigException("Value for config confluent.cdc.api.keys.load.timeout.ms must be positive integer when using client quotas");
        }
        HashSet consumerConfigNames = new HashSet(ConsumerConfig.configNames());
        consumerConfigNames.remove("metric.reporters");
        HashMap consumerProps = new HashMap(interBrokerClientConfigs);
        consumerProps.keySet().retainAll(consumerConfigNames);
        consumerProps.put("client.id", clientId);
        consumerProps.put("bootstrap.servers", interBrokerClientConfigs.get("bootstrap.servers"));
        consumerProps.put("allow.auto.create.topics", false);
        consumerProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
        LOG.debug("Creating consumer with properties: {}", consumerProps);
        return new KafkaBasedLog(topic, null, consumerProps, () -> null, (Callback)new ConsumeCallback(), this.time, null, timeoutMs.longValue());
    }

    public void close() {
        if (this.sessionUuid == null) {
            LOG.warn("close() called without configure() being called first");
            return;
        }
        LOG.info("Closing consumer for session {}", (Object)this.sessionUuid);
        this.close(this.sessionUuid);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void close(String brokerSessionUuid) {
        Map<String, TenantClientQuotaConsumer> map = INSTANCES;
        synchronized (map) {
            TenantClientQuotaConsumer instance = INSTANCES.get(brokerSessionUuid);
            if (instance != this) {
                LOG.error("Closing instance that doesn't match the instance in the static map with the same broker session {}. Will not close this instance or remove it from the map", (Object)brokerSessionUuid);
                return;
            }
            INSTANCES.remove(brokerSessionUuid);
            LOG.info("Removed instance for broker session {}", (Object)brokerSessionUuid);
        }
        this.stopLog();
    }

    private void stopLog() {
        State s = this.state.get();
        if (s.equals((Object)State.NOT_ENABLED) || s.equals((Object)State.FAILED_TO_START)) {
            return;
        }
        try {
            State prevState = this.state.getAndSet(State.CLOSED);
            if (prevState.equals((Object)State.RUNNING) || prevState.equals((Object)State.STARTING)) {
                this.clientQuotasLog.stop();
                LOG.info("Successfully closed the client quota consumer");
                return;
            }
            LOG.debug("Asked to close from a non-running state {}", (Object)prevState);
        }
        catch (Exception e) {
            LOG.error("Error when shutting down the consumer", (Throwable)e);
        }
    }

    void consume(ConsumerRecord<byte[], byte[]> record) {
        Long sequenceId = Utils.tryParseEventsSequenceId(record);
        if (sequenceId == null) {
            LOG.error("Missing sequence ID in client quotas message! (partition = {}, offset = {}, timestamp = {})", new Object[]{record.partition(), record.offset(), record.timestamp()});
            return;
        }
        ClientQuotaKey key = this.parseKey(record, sequenceId);
        if (key == null) {
            return;
        }
        if (!this.verifySequenceId(sequenceId, key.getClusterId(), key.getPrincipal())) {
            return;
        }
        if (record.value() == null) {
            this.deleteQuotas(sequenceId, key.getClusterId(), key.getPrincipal());
            return;
        }
        ClientQuotaValue value = this.parseValue(record, sequenceId, key);
        if (value == null) {
            return;
        }
        this.updateQuotas(sequenceId, key.getClusterId(), key.getPrincipal(), value.getIngressBytesRate(), value.getEgressBytesRate());
    }

    private ClientQuotaKey parseKey(ConsumerRecord<byte[], byte[]> record, long sequenceId) {
        ClientQuotaKey clientQuotaKey;
        if (record.key() == null) {
            LOG.error("Missing key in client quotas message! (partition = {}, offset = {}, timestamp = {}, sequence id = {})", new Object[]{record.partition(), record.offset(), record.timestamp(), sequenceId});
            return null;
        }
        try {
            clientQuotaKey = ClientQuotaKey.parseFrom((byte[])record.key());
        }
        catch (InvalidProtocolBufferException ipbe) {
            LOG.error(String.format("Can't parse key in client quotas message! (partition = %d, offset = %d, timestamp = %d, sequence id = %s)", record.partition(), record.offset(), record.timestamp(), sequenceId), (Throwable)ipbe);
            return null;
        }
        if (clientQuotaKey.getClusterId() == null || clientQuotaKey.getClusterId().isEmpty()) {
            LOG.error("Missing cluster ID in client quotas message! (partition = {}, offset = {}, timestamp = {}, key = {})", new Object[]{record.partition(), record.offset(), record.timestamp(), clientQuotaKey});
            return null;
        }
        if (clientQuotaKey.getPrincipal() == null || clientQuotaKey.getPrincipal().isEmpty()) {
            LOG.error("Missing principal ID in client quotas message! (partition = {}, offset = {}, timestamp = {}, key = {})", new Object[]{record.partition(), record.offset(), record.timestamp(), clientQuotaKey});
            return null;
        }
        return clientQuotaKey;
    }

    private ClientQuotaValue parseValue(ConsumerRecord<byte[], byte[]> record, long sequenceId, ClientQuotaKey key) {
        ClientQuotaValue clientQuotaValue;
        try {
            clientQuotaValue = ClientQuotaValue.parseFrom((byte[])record.value());
        }
        catch (InvalidProtocolBufferException ipbe) {
            LOG.error(String.format("Can't parse value in client quotas message! (partition = %d, offset = %d, timestamp = %d, key = %s, sequence id = %d)", record.partition(), record.offset(), record.timestamp(), key, sequenceId), (Throwable)ipbe);
            return null;
        }
        if (clientQuotaValue.getEgressBytesRate() <= 0L || clientQuotaValue.getIngressBytesRate() <= 0L) {
            LOG.error("Non-positive quotas specified in client quotas message! (partition = {}, offset = {}, timestamp = {}, key = {}, value = {})", new Object[]{record.partition(), record.offset(), record.timestamp(), key, clientQuotaValue});
            return null;
        }
        return clientQuotaValue;
    }

    private Long getLastSeenSequenceId(String tenantId, String userResourceId) {
        TenantCacheEntry tce;
        Map<String, TenantCacheEntry> m = this.tenantsCache.get(tenantId);
        if (m != null && (tce = m.get(userResourceId)) != null) {
            return tce.getSequenceId();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateTenantCache(String tenantId, String userResourceId, long sequenceId, long ingressByteRate, long egressByteRate) {
        Map<String, Map<String, TenantCacheEntry>> map = this.tenantsCache;
        synchronized (map) {
            Map<String, TenantCacheEntry> m = this.tenantsCache.get(tenantId);
            if (m == null) {
                m = new ConcurrentHashMap<String, TenantCacheEntry>();
                this.tenantsCache.put(tenantId, m);
            }
            m.put(userResourceId, new TenantCacheEntry(sequenceId, ingressByteRate, egressByteRate));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void removeFromTenantCache(String tenantId, String userResourceId, long sequenceId) {
        Map<String, Map<String, TenantCacheEntry>> map = this.tenantsCache;
        synchronized (map) {
            Map<String, TenantCacheEntry> m = this.tenantsCache.get(tenantId);
            if (m == null) {
                LOG.info("Got a message to delete a quota for tenant {} and user {}, but no entry to remove", (Object)tenantId, (Object)userResourceId);
                m = new ConcurrentHashMap<String, TenantCacheEntry>();
                this.tenantsCache.put(tenantId, m);
            }
            m.put(userResourceId, new TenantCacheEntry(sequenceId));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void updateTenantQuotasFromCache(String tenantId) {
        State s = this.state.get();
        if (!s.equals((Object)State.RUNNING)) {
            LOG.debug("updateTenantQuotasFromCache called while not running, current state is {}", (Object)s);
            return;
        }
        Map<String, Map<String, TenantCacheEntry>> map = this.tenantsCache;
        synchronized (map) {
            LOG.info("Updating client quotas for tenant = {}", (Object)tenantId);
            Map<String, TenantCacheEntry> tenantCacheEntry = this.tenantsCache.get(tenantId);
            if (tenantCacheEntry == null) {
                LOG.error("Unable to find a tenantsCache entry for {}. A possible race maybe?", (Object)tenantId);
                return;
            }
            QuotaConfig defaultUserQuota = QuotaConfig.UNLIMITED_QUOTA;
            TenantCacheEntry defaultQuotaEntry = tenantCacheEntry.get(DEFAULT_TENANT_QUOTA_KEY);
            if (defaultQuotaEntry != null) {
                defaultUserQuota = defaultQuotaEntry.toQuotaConfig();
            }
            Map<String, QuotaConfig> userQuotas = tenantCacheEntry.entrySet().stream().filter(e -> !((String)e.getKey()).equals(DEFAULT_TENANT_QUOTA_KEY)).collect(Collectors.toMap(Map.Entry::getKey, e -> ((TenantCacheEntry)e.getValue()).toQuotaConfig()));
            LOG.info("Updating quotas for tenant {}. userQuotas = {}, defaultUserQuota = {}", new Object[]{tenantId, userQuotas, defaultUserQuota});
            TenantQuotaCallback.updateUserQuotas(tenantId, userQuotas, defaultUserQuota);
        }
    }

    public CompletableFuture<Void> start() {
        if (this.state.get().equals((Object)State.NOT_ENABLED)) {
            LOG.debug("Trying to start from a non enabled state. Ignoring");
            return CompletableFuture.completedFuture(null);
        }
        if (!this.state.compareAndSet(State.NOT_STARTED, State.STARTING)) {
            throw new IllegalStateException("Trying to start a log from a state it can't be started in");
        }
        try {
            return CompletableFuture.runAsync(() -> this.startLog());
        }
        catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to create a future for startLog()", e);
        }
    }

    private void startLog() {
        if (!this.state.get().equals((Object)State.STARTING)) {
            throw new IllegalStateException("Trying to start log from a non starting state");
        }
        try {
            long startNs = this.time.nanoseconds();
            this.clientQuotasLog.start();
            this.state.set(State.RUNNING);
            this.tenantsCache.keySet().forEach(tenantId -> this.updateTenantQuotasFromCache((String)tenantId));
            long startLogNs = this.time.nanoseconds() - startNs;
            LOG.info("Consumed initial set of client quotas from topic took {} nanoseconds", (Object)startLogNs);
        }
        catch (Exception e) {
            this.state.set(State.FAILED_TO_START);
            throw new IllegalStateException("Unable to start consuming client quotas from topic", e);
        }
    }

    boolean verifySequenceId(long sequenceId, String tenantId, String userResourceId) {
        Long previousSequenceId = this.getLastSeenSequenceId(tenantId, userResourceId);
        if (previousSequenceId != null && sequenceId < previousSequenceId) {
            LOG.info("Received client quotas for (tenant = {}, user resource id = {}) with an earlier sequence id (last seen = {}, recent = {}), ignoring", new Object[]{tenantId, userResourceId, previousSequenceId, sequenceId});
            return false;
        }
        return true;
    }

    void updateQuotas(long sequenceId, String tenantId, String userResourceId, long ingressByteRate, long egressByteRate) {
        this.updateTenantCache(tenantId, userResourceId, sequenceId, ingressByteRate, egressByteRate);
        this.updateTenantQuotasFromCache(tenantId);
    }

    void deleteQuotas(long sequenceId, String tenantId, String userResourceId) {
        this.removeFromTenantCache(tenantId, userResourceId, sequenceId);
        this.updateTenantQuotasFromCache(tenantId);
    }

    private class ConsumeCallback
    implements Callback<ConsumerRecord<byte[], byte[]>> {
        private ConsumeCallback() {
        }

        public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
            if (error != null) {
                LOG.error("Unexpected error in ConsumeCallback for TenantClientQuotaConsumer", error);
                return;
            }
            TenantClientQuotaConsumer.this.consume(record);
        }
    }

    private static class TenantCacheEntry {
        private final long sequenceId;
        private final long ingressByteRate;
        private final long egressByteRate;

        public TenantCacheEntry(long sequenceId) {
            this.sequenceId = sequenceId;
            this.ingressByteRate = -1L;
            this.egressByteRate = -1L;
        }

        public TenantCacheEntry(long sequenceId, long ingressByteRate, long egressByteRate) {
            if (ingressByteRate <= 0L || egressByteRate <= 0L) {
                throw new IllegalArgumentException("Ingress and egress must be positive numbers");
            }
            this.sequenceId = sequenceId;
            this.ingressByteRate = ingressByteRate;
            this.egressByteRate = egressByteRate;
        }

        public long getSequenceId() {
            return this.sequenceId;
        }

        public QuotaConfig toQuotaConfig() {
            if (this.ingressByteRate == -1L && this.egressByteRate == -1L) {
                return QuotaConfig.UNLIMITED_QUOTA;
            }
            return new QuotaConfig(this.ingressByteRate, this.egressByteRate, null, null, QuotaConfig.UNLIMITED_QUOTA);
        }
    }

    public static enum State {
        NOT_STARTED,
        NOT_ENABLED,
        STARTING,
        RUNNING,
        CLOSED,
        FAILED_TO_START;

    }
}

