package org.apache.kafka.server.link;

import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.message.MetadataRequestData;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.KafkaChannelTest;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestInternals;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.interceptor.BrokerInterceptor;
import org.apache.kafka.server.interceptor.DefaultBrokerInterceptor;
import org.apache.kafka.server.metrics.ApiSensorBuilder;
import org.apache.kafka.server.metrics.ApiSensors;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/server/link/ClusterLinkMetricsTest.class */
public class ClusterLinkMetricsTest {
    private Metrics metrics;
    private Time time;
    private BrokerInterceptor interceptor;

    @Before
    public void setUp() {
        this.metrics = new Metrics();
        this.time = new MockTime(0L);
        this.interceptor = createInterceptor(this.time);
    }

    @After
    public void tearDown() {
        this.metrics.close();
    }

    @Test
    public void testMetrics() {
        UUID randomUUID = UUID.randomUUID();
        ApiKeys apiKeys = ApiKeys.METADATA;
        short latestVersion = ApiKeys.METADATA.latestVersion();
        RequestHeaderData requestApiVersion = new RequestHeaderData().setClusterLinkId(randomUUID).setClientId("clientId").setCorrelationId(1).setRequestApiKey(apiKeys.id).setRequestApiVersion(latestVersion);
        RequestContext requestContext = requestContext(requestApiVersion);
        MetadataRequest metadataRequest = new MetadataRequest(new MetadataRequestData(), latestVersion);
        requestContext.parseRequest(toByteBuffer(metadataRequest));
        Double linkMetricValue = linkMetricValue("request-total", randomUUID, apiKeys);
        Assert.assertNotNull("Request metrics not found", linkMetricValue);
        Assert.assertEquals(1.0d, linkMetricValue.doubleValue(), 0.001d);
        Double linkMetricValue2 = linkMetricValue("request-byte-total", randomUUID, apiKeys);
        Assert.assertNotNull("Request bytes metrics not found", linkMetricValue2);
        Assert.assertTrue("Request bytes not recorded", linkMetricValue2.doubleValue() > 0.0d);
        MetadataResponseData brokers = new MetadataResponseData().setClusterId("clusterId").setControllerId(0).setBrokers(new MetadataResponseData.MetadataResponseBrokerCollection());
        MetadataResponse metadataResponse = new MetadataResponse(brokers);
        this.time.sleep(1L);
        requestContext.buildResponse(metadataResponse);
        Double linkMetricValue3 = linkMetricValue("response-byte-total", randomUUID, apiKeys);
        Assert.assertNotNull("Response bytes metrics not found", linkMetricValue3);
        Assert.assertTrue("Response bytes not recorded", linkMetricValue3.doubleValue() > 0.0d);
        Double linkMetricValue4 = linkMetricValue("response-time-ns-max", randomUUID, apiKeys);
        Assert.assertNotNull("Response time metrics not found", linkMetricValue4);
        Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(1L), linkMetricValue4.doubleValue(), 0.001d);
        RequestContext requestContext2 = requestContext(new RequestHeaderData().setClientId("clientId").setCorrelationId(2).setRequestApiKey(apiKeys.id).setRequestApiVersion(latestVersion));
        requestContext2.parseRequest(toByteBuffer(metadataRequest));
        Double linkMetricValue5 = linkMetricValue("request-total", randomUUID, apiKeys);
        Assert.assertNotNull("Request metrics not found", linkMetricValue5);
        Assert.assertEquals(2.0d, linkMetricValue5.doubleValue(), 0.001d);
        Double linkMetricValue6 = linkMetricValue("request-byte-total", randomUUID, apiKeys);
        Assert.assertNotNull("Request bytes metrics not found", linkMetricValue6);
        Assert.assertTrue("Request bytes not recorded", linkMetricValue6.doubleValue() > linkMetricValue2.doubleValue());
        this.time.sleep(2L);
        requestContext2.buildResponse(metadataResponse);
        Double linkMetricValue7 = linkMetricValue("response-byte-total", randomUUID, apiKeys);
        Assert.assertNotNull("Response bytes metrics not found", linkMetricValue7);
        Assert.assertTrue("Response bytes not recorded", linkMetricValue7.doubleValue() > linkMetricValue3.doubleValue());
        Double linkMetricValue8 = linkMetricValue("response-time-ns-max", randomUUID, apiKeys);
        Assert.assertNotNull("Response time metrics not found", linkMetricValue8);
        Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(2L), linkMetricValue8.doubleValue(), 0.001d);
        RequestContext requestContext3 = requestContext(requestApiVersion);
        MetadataResponseData.MetadataResponseTopicCollection metadataResponseTopicCollection = new MetadataResponseData.MetadataResponseTopicCollection();
        metadataResponseTopicCollection.add(new MetadataResponseData.MetadataResponseTopic().setName(responseTopic()).setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code()));
        MetadataResponse metadataResponse2 = new MetadataResponse(brokers.setTopics(metadataResponseTopicCollection));
        this.time.sleep(3L);
        requestContext3.buildResponse(metadataResponse2);
        Double linkMetricValue9 = linkMetricValue("response-byte-total", randomUUID, apiKeys);
        Assert.assertNotNull("Response bytes metrics not found", linkMetricValue9);
        Assert.assertTrue("Response bytes not recorded", linkMetricValue9.doubleValue() > linkMetricValue7.doubleValue());
        Double linkMetricValue10 = linkMetricValue("response-time-ns-max", randomUUID, apiKeys);
        Assert.assertNotNull("Response time metrics not found", linkMetricValue10);
        Assert.assertEquals(TimeUnit.MILLISECONDS.toNanos(3L), linkMetricValue10.doubleValue(), 0.001d);
        Double linkMetricValue11 = linkMetricValue("error-total", randomUUID, apiKeys);
        Assert.assertNotNull("Error metrics not found", linkMetricValue11);
        Assert.assertEquals(1.0d, linkMetricValue11.doubleValue(), 0.001d);
        verifyMetrics((Set) this.metrics.metrics().keySet().stream().filter(metricName -> {
            return metricName.tags().containsKey("link-id");
        }).collect(Collectors.toSet()));
    }

    @Test
    public void testApiSensorsReinitialized() {
        ClusterLinkMetrics clusterLinkMetrics = new ClusterLinkMetrics(this.metrics, UUID.randomUUID());
        MockTime mockTime = new MockTime();
        ApiKeys apiKeys = ApiKeys.API_VERSIONS;
        clusterLinkMetrics.recordResponse(apiKeys, 100L, mockTime.nanoseconds(), Collections.emptyMap(), mockTime.milliseconds());
        Map singletonMap = Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 1);
        clusterLinkMetrics.recordResponse(apiKeys, 100L, mockTime.nanoseconds(), singletonMap, mockTime.milliseconds());
        ApiSensors apiSensors = clusterLinkMetrics.apiSensors(apiKeys);
        Assert.assertEquals(Collections.emptySet(), apiSensors.errorsWithoutSensors(this.metrics, singletonMap.keySet()));
        ApiSensorBuilder apiSensorBuilder = new ApiSensorBuilder(this.metrics, clusterLinkMetrics.builderContext(), apiKeys);
        Map orCreateSuffixedSensors = apiSensorBuilder.getOrCreateSuffixedSensors();
        Map orCreateErrorSensors = apiSensorBuilder.getOrCreateErrorSensors(singletonMap.keySet());
        Assert.assertFalse(apiSensors.requestSensorsExpired(this.metrics));
        orCreateErrorSensors.values().forEach(sensor -> {
            this.metrics.removeSensor(sensor.name());
        });
        Assert.assertEquals(singletonMap.keySet(), clusterLinkMetrics.apiSensors(apiKeys).errorsWithoutSensors(this.metrics, singletonMap.keySet()));
        orCreateSuffixedSensors.values().forEach(sensor2 -> {
            this.metrics.removeSensor(sensor2.name());
        });
        Assert.assertTrue(apiSensors.requestSensorsExpired(this.metrics));
        Assert.assertTrue(apiSensors.responseSensorsExpired(this.metrics));
        clusterLinkMetrics.recordResponse(apiKeys, 100L, mockTime.nanoseconds(), Collections.emptyMap(), mockTime.milliseconds());
        ApiSensors apiSensors2 = clusterLinkMetrics.apiSensors(apiKeys);
        Assert.assertNotEquals(apiSensors, apiSensors2);
        Assert.assertFalse(apiSensors2.requestSensorsExpired(this.metrics));
        Assert.assertFalse(apiSensors2.responseSensorsExpired(this.metrics));
    }

    protected BrokerInterceptor createInterceptor(Time time) {
        return new DefaultBrokerInterceptor(time);
    }

    protected KafkaPrincipal createPrincipal() {
        return new KafkaPrincipal("User", "user");
    }

    protected String responseTopic() {
        return "topic";
    }

    protected void verifyMetrics(Set<MetricName> set) {
        for (MetricName metricName : set) {
            String str = (String) metricName.tags().get("request");
            if (str != null) {
                Assert.assertEquals(ApiKeys.METADATA.name, str);
            }
            String str2 = (String) metricName.tags().get("error");
            if (str2 != null) {
                Assert.assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.name(), str2);
            }
        }
    }

    private RequestContext requestContext(RequestHeaderData requestHeaderData) {
        return this.interceptor.newContext(new RequestHeader(requestHeaderData, (short) 2), KafkaChannelTest.CHANNEL_ID, InetAddress.getLoopbackAddress(), createPrincipal(), ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, (ClientInformation) null, this.metrics);
    }

    private ByteBuffer toByteBuffer(AbstractRequest abstractRequest) {
        Struct struct = RequestInternals.toStruct(abstractRequest);
        ByteBuffer allocate = ByteBuffer.allocate(struct.sizeOf());
        struct.writeTo(allocate);
        allocate.flip();
        return allocate;
    }

    private Double linkMetricValue(String str, UUID uuid, ApiKeys apiKeys) {
        for (Map.Entry entry : this.metrics.metrics().entrySet()) {
            MetricName metricName = (MetricName) entry.getKey();
            if ("cluster-link-source-metrics".equals(metricName.group()) && str.equals(metricName.name()) && uuid.toString().equals(metricName.tags().get("link-id")) && apiKeys.name.equals(metricName.tags().get("request"))) {
                return Double.valueOf(((Double) ((KafkaMetric) entry.getValue()).metricValue()).doubleValue());
            }
        }
        return null;
    }
}
