/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.quota;

import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.TenantMetadata;
import io.confluent.kafka.multitenant.quota.QuotaConfig;
import io.confluent.kafka.multitenant.quota.TenantQuotaCallback;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kafka.network.RequestChannel;
import kafka.server.ClientQuotaManager;
import kafka.server.KafkaConfig;
import kafka.server.QuotaFactory;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Option;

public class MultiTenantClientQuotaManagerTest {
    private static final Long MIN_BROKER_CONSUME_QUOTA = 500L;
    private static final Long MIN_BROKER_PRODUCE_QUOTA = 500L;
    private static final Long MAX_BROKER_CONSUME_QUOTA = 1200L;
    private static final Long MAX_BROKER_PRODUCE_QUOTA = 600L;
    private static final Long MAX_BROKER_PRODUCE_CAPACITY = 1000L;
    private static final Double DEFAULT_CONTROLLER_QUOTA = 20.0;
    private static final Long TENANT_1_PRODUCE_BYTE_RATE = 500L;
    private static final Long TENANT_1_CONSUME_BYTE_RATE = 1000L;
    private final Time time = new MockTime();
    private Metrics metrics;
    private QuotaFactory.QuotaManagers quotaManagers;

    @BeforeEach
    public void setup() {
        this.metrics = new Metrics(this.time);
        this.quotaManagers = this.createQuotaManagers();
        this.createTenantQuotas();
    }

    @AfterEach
    public void tearDown() {
        this.quotaManagers.shutdown();
        TenantQuotaCallback.closeAll();
        this.metrics.close();
    }

    @Test
    public void testOnlyTenantPrincipalsTrackedInActiveTenants() {
        ClientQuotaManager produce = this.quotaManagers.produce();
        MultiTenantPrincipal tenantPrincipal = new MultiTenantPrincipal("userA", new TenantMetadata.Builder("tenant1", "sa-a").build());
        Assertions.assertTrue((boolean)produce.backpressureEnabled());
        Assertions.assertEquals((double)MAX_BROKER_PRODUCE_CAPACITY.longValue(), (double)produce.getBrokerQuotaLimit(), (double)0.01);
        produce.maybeRecordAndGetThrottleTimeMs(this.createSession((KafkaPrincipal)tenantPrincipal), "", 500.0, this.time.milliseconds());
        produce.maybeRecordAndGetThrottleTimeMs(this.createSession(KafkaPrincipal.ANONYMOUS), "", 1500.0, this.time.milliseconds());
        produce.maybeAutoTuneQuota();
        Assertions.assertEquals((double)9.223372036854776E18, (double)produce.dynamicQuota(KafkaPrincipal.ANONYMOUS, "").bound(), (double)0.01);
        Assertions.assertEquals((double)TENANT_1_PRODUCE_BYTE_RATE.longValue(), (double)produce.dynamicQuota((KafkaPrincipal)tenantPrincipal, "").bound(), (double)0.01);
    }

    private QuotaFactory.QuotaManagers createQuotaManagers() {
        Properties props = new Properties();
        props.put(KafkaConfig.ZkConnectProp(), TestUtils.MockZkConnect());
        props.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:9092");
        props.put(KafkaConfig.NumQuotaSamplesProp(), String.valueOf(2));
        props.putAll(this.quotaCallbackProps());
        KafkaConfig config = KafkaConfig.fromProps((Properties)props);
        return QuotaFactory.instantiate((KafkaConfig)config, (Metrics)this.metrics, (Time)this.time, (String)"", (Option)Option.empty());
    }

    private void createTenantQuotas() {
        HashMap<String, QuotaConfig> tenantQuotas = new HashMap<String, QuotaConfig>();
        tenantQuotas.put("tenant1", this.quotaConfig(TENANT_1_PRODUCE_BYTE_RATE, TENANT_1_CONSUME_BYTE_RATE, 300.0));
        tenantQuotas.put("tenant2", this.quotaConfig(2000L, 3000L, 400.0));
        TenantQuotaCallback.updateQuotas(tenantQuotas, (QuotaConfig)QuotaConfig.UNLIMITED_QUOTA);
    }

    private Map<String, Object> quotaCallbackProps() {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put(KafkaConfig.ClientQuotaCallbackClassProp(), TenantQuotaCallback.class.getName());
        configs.put("confluent.broker.limit.producer.bytes.per.second", MAX_BROKER_PRODUCE_CAPACITY.toString());
        configs.put("confluent.backpressure.types", "produce");
        configs.put("broker.id", String.valueOf(1));
        configs.put("confluent.quota.tenant.follower.broker.min.producer.rate", MIN_BROKER_PRODUCE_QUOTA.toString());
        configs.put("confluent.quota.tenant.broker.max.producer.rate", MAX_BROKER_PRODUCE_QUOTA.toString());
        configs.put("confluent.quota.tenant.follower.broker.min.consumer.rate", MIN_BROKER_CONSUME_QUOTA.toString());
        configs.put("confluent.quota.tenant.broker.max.consumer.rate", MAX_BROKER_CONSUME_QUOTA.toString());
        configs.put("confluent.quota.tenant.default.controller.mutation.rate", DEFAULT_CONTROLLER_QUOTA.toString());
        return configs;
    }

    private QuotaConfig quotaConfig(long producerByteRate, long consumerByteRate, double requestPercentage) {
        return new QuotaConfig(Long.valueOf(producerByteRate), Long.valueOf(consumerByteRate), Double.valueOf(requestPercentage), null, null, QuotaConfig.UNLIMITED_QUOTA);
    }

    private RequestChannel.Session createSession(KafkaPrincipal principal) {
        return new RequestChannel.Session(principal, null);
    }
}

