/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.quota;

import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantClientQuotaConsumer;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.protobuf.cloud.events.v1.ClientQuotaKey;
import io.confluent.protobuf.cloud.events.v1.ClientQuotaValue;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import kafka.server.KafkaConfig;
import kafka.utils.MockTime;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.util.KafkaBasedLog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

public class TenantClientQuotaConsumerTest {
    private static final String TOPIC = "_confluent-client_quota";
    private static final String SESSION_UUID = "session-uuid";
    private MockedStatic<TenantQuotaCallback> tenantQuotaCallback;

    @BeforeEach
    public void setUp() throws Exception {
        this.tenantQuotaCallback = Mockito.mockStatic(TenantQuotaCallback.class, (Answer)Answers.RETURNS_SMART_NULLS);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.tenantQuotaCallback.close();
    }

    @Test
    public void testDisabledInConfig() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        HashMap interBrokerClientConfig = new HashMap();
        configs.put(KafkaConfig.BrokerSessionUuidProp(), SESSION_UUID);
        configs.put("confluent.cdc.client.quotas.enable", new Boolean(false));
        TenantClientQuotaConsumer tcqc = new TenantClientQuotaConsumer(interBrokerClientConfig, new Metrics(), (Time)new MockTime());
        Assertions.assertEquals((Object)TenantClientQuotaConsumer.State.NOT_STARTED, tcqc.state.get());
        tcqc.configure(configs);
        Assertions.assertEquals((Object)TenantClientQuotaConsumer.State.NOT_ENABLED, tcqc.state.get());
        tcqc.close();
        Assertions.assertEquals((Object)TenantClientQuotaConsumer.State.NOT_ENABLED, tcqc.state.get());
        configs.put("confluent.cdc.client.quotas.enable", null);
        tcqc = new TenantClientQuotaConsumer(interBrokerClientConfig, new Metrics(), (Time)new MockTime());
        Assertions.assertEquals((Object)TenantClientQuotaConsumer.State.NOT_STARTED, tcqc.state.get());
        tcqc.configure(configs);
        Assertions.assertEquals((Object)TenantClientQuotaConsumer.State.NOT_ENABLED, tcqc.state.get());
        tcqc.close();
        Assertions.assertEquals((Object)TenantClientQuotaConsumer.State.NOT_ENABLED, tcqc.state.get());
    }

    private TenantClientQuotaConsumer createAndStartConsumerWithMockLog(Metrics metrics, Time time) {
        TenantClientQuotaConsumer tcqc = new TenantClientQuotaConsumer(null, metrics, time);
        tcqc.configure((KafkaBasedLog)Mockito.mock(KafkaBasedLog.class), SESSION_UUID);
        tcqc.start().join();
        return tcqc;
    }

    private void closeConsumer(TenantClientQuotaConsumer tcqc) {
        tcqc.close();
        Assertions.assertEquals((Object)TenantClientQuotaConsumer.State.CLOSED, tcqc.state.get());
    }

    private byte[] encodeClientQuotaKey(String clusterId, String principal) {
        ClientQuotaKey.Builder builder = ClientQuotaKey.newBuilder().setClusterId(clusterId).setPrincipal(principal);
        return builder.build().toByteArray();
    }

    private byte[] encodeClientQuotaValue(long ingressByteRate, long egressByteRate) {
        ClientQuotaValue.Builder builder = ClientQuotaValue.newBuilder().setIngressBytesRate(ingressByteRate).setEgressBytesRate(egressByteRate);
        return builder.build().toByteArray();
    }

    private ConsumerRecord<byte[], byte[]> createRecord(long seqId, byte[] key, byte[] value) {
        RecordHeaders headers = KafkaTestUtils.createGoodSequenceIdRecordHeaders(seqId, true);
        return new ConsumerRecord(TOPIC, 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, (Object)key, (Object)value, (Headers)headers, Optional.empty());
    }

    @Test
    public void testIgnoreBadMessages() {
        TenantClientQuotaConsumer tcqc = this.createAndStartConsumerWithMockLog(new Metrics(), (Time)new MockTime());
        ConsumerRecord noHeaders = new ConsumerRecord(TOPIC, 0, 0L, (Object)this.encodeClientQuotaKey("lkc-something", "user-id"), (Object)this.encodeClientQuotaValue(100L, 100L));
        tcqc.consume(noHeaders);
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)((String)ArgumentMatchers.any()), (Map)((Map)ArgumentMatchers.any()), (QuotaConfig)((QuotaConfig)ArgumentMatchers.any())), Mockito.never());
        ConsumerRecord<byte[], byte[]> noKey = this.createRecord(1L, new byte[0], this.encodeClientQuotaValue(100L, 100L));
        tcqc.consume(noKey);
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)((String)ArgumentMatchers.any()), (Map)((Map)ArgumentMatchers.any()), (QuotaConfig)((QuotaConfig)ArgumentMatchers.any())), Mockito.never());
        ConsumerRecord<byte[], byte[]> noValue = this.createRecord(1L, this.encodeClientQuotaKey("lkc-something", "user-id"), new byte[0]);
        tcqc.consume(noValue);
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)((String)ArgumentMatchers.any()), (Map)((Map)ArgumentMatchers.any()), (QuotaConfig)((QuotaConfig)ArgumentMatchers.any())), Mockito.never());
        this.closeConsumer(tcqc);
    }

    @Test
    public void testConsumePrincipal() {
        TenantClientQuotaConsumer tcqc = this.createAndStartConsumerWithMockLog(new Metrics(), (Time)new MockTime());
        byte[] key = this.encodeClientQuotaKey("lkc-some", "user-id");
        HashMap<String, QuotaConfig> uq1 = new HashMap<String, QuotaConfig>();
        uq1.put("user-id", new QuotaConfig(Long.valueOf(100L), Long.valueOf(100L), null, null, null, QuotaConfig.UNLIMITED_QUOTA));
        tcqc.consume(this.createRecord(100L, key, this.encodeClientQuotaValue(100L, 100L)));
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)"lkc-some", (Map)uq1, (QuotaConfig)QuotaConfig.UNLIMITED_QUOTA));
        tcqc.consume(this.createRecord(99L, key, this.encodeClientQuotaValue(10L, 10L)));
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)((String)ArgumentMatchers.eq((Object)"lkc-some")), (Map)((Map)ArgumentMatchers.any()), (QuotaConfig)((QuotaConfig)ArgumentMatchers.any())), Mockito.times((int)1));
        HashMap<String, QuotaConfig> uq2 = new HashMap<String, QuotaConfig>();
        uq2.put("user-id", new QuotaConfig(Long.valueOf(50L), Long.valueOf(50L), null, null, null, QuotaConfig.UNLIMITED_QUOTA));
        tcqc.consume(this.createRecord(101L, key, this.encodeClientQuotaValue(50L, 50L)));
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)"lkc-some", (Map)uq2, (QuotaConfig)QuotaConfig.UNLIMITED_QUOTA));
        HashMap<String, QuotaConfig> uq3 = new HashMap<String, QuotaConfig>();
        uq3.put("user-id", QuotaConfig.UNLIMITED_QUOTA);
        tcqc.consume(this.createRecord(102L, key, null));
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)"lkc-some", (Map)uq3, (QuotaConfig)QuotaConfig.UNLIMITED_QUOTA));
        tcqc.consume(this.createRecord(101L, key, this.encodeClientQuotaValue(900L, 900L)));
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)"lkc-some", (Map)uq3, (QuotaConfig)QuotaConfig.UNLIMITED_QUOTA));
        this.closeConsumer(tcqc);
    }

    @Test
    public void testConsumeDefaultQuota() {
        TenantClientQuotaConsumer tcqc = this.createAndStartConsumerWithMockLog(new Metrics(), (Time)new MockTime());
        byte[] defaultKey = this.encodeClientQuotaKey("lkc-some", "<default>");
        byte[] principalKey = this.encodeClientQuotaKey("lkc-some", "user-id");
        HashMap<String, QuotaConfig> uq1 = new HashMap<String, QuotaConfig>();
        uq1.put("user-id", new QuotaConfig(Long.valueOf(100L), Long.valueOf(100L), null, null, null, QuotaConfig.UNLIMITED_QUOTA));
        tcqc.consume(this.createRecord(100L, principalKey, this.encodeClientQuotaValue(100L, 100L)));
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)"lkc-some", (Map)uq1, (QuotaConfig)QuotaConfig.UNLIMITED_QUOTA));
        QuotaConfig defaultQuota = new QuotaConfig(Long.valueOf(200L), Long.valueOf(200L), null, null, null, QuotaConfig.UNLIMITED_QUOTA);
        tcqc.consume(this.createRecord(101L, defaultKey, this.encodeClientQuotaValue(200L, 200L)));
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)"lkc-some", (Map)uq1, (QuotaConfig)defaultQuota));
        this.tenantQuotaCallback.reset();
        tcqc.consume(this.createRecord(102L, defaultKey, null));
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)"lkc-some", (Map)uq1, (QuotaConfig)QuotaConfig.UNLIMITED_QUOTA));
        tcqc.consume(this.createRecord(101L, defaultKey, this.encodeClientQuotaValue(900L, 900L)));
        this.tenantQuotaCallback.verify(() -> TenantQuotaCallback.updateUserQuotas((String)"lkc-some", (Map)uq1, (QuotaConfig)QuotaConfig.UNLIMITED_QUOTA));
        this.closeConsumer(tcqc);
    }

    @Test
    public void testIllegalArgumentException() {
        TenantClientQuotaConsumer tcqc = this.createAndStartConsumerWithMockLog(new Metrics(), (Time)new MockTime());
        byte[] key = this.encodeClientQuotaKey("lkc-some", "user-id");
        HashMap<String, QuotaConfig> uq1 = new HashMap<String, QuotaConfig>();
        uq1.put("user-id", new QuotaConfig(Long.valueOf(100L), Long.valueOf(100L), null, null, null, QuotaConfig.UNLIMITED_QUOTA));
        this.tenantQuotaCallback.when(() -> TenantQuotaCallback.updateUserQuotas((String)((String)ArgumentMatchers.any()), (Map)((Map)ArgumentMatchers.any()), (QuotaConfig)((QuotaConfig)ArgumentMatchers.any()))).thenThrow(new Throwable[]{new IllegalArgumentException("Your exception message")});
        tcqc.consume(this.createRecord(100L, key, this.encodeClientQuotaValue(100L, 100L)));
        this.closeConsumer(tcqc);
    }
}

