package io.confluent.kafkarest.integration.v3;

import io.confluent.kafkarest.entities.v2.BinaryPartitionProduceRequest;
import io.confluent.kafkarest.entities.v3.ConsumerLagData;
import io.confluent.kafkarest.entities.v3.ConsumerLagDataList;
import io.confluent.kafkarest.entities.v3.GetConsumerLagResponse;
import io.confluent.kafkarest.entities.v3.ListConsumerLagsResponse;
import io.confluent.kafkarest.entities.v3.Resource;
import io.confluent.kafkarest.entities.v3.ResourceCollection;
import io.confluent.kafkarest.integration.ClusterTestHarness;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:io/confluent/kafkarest/integration/v3/ConsumerLagsResourceIntegrationTest.class */
public class ConsumerLagsResourceIntegrationTest extends ClusterTestHarness {
    private static final String topic1 = "topic-1";
    private static final String topic2 = "topic-2";
    private static final String[] topics = {topic1, topic2};
    private static final String group1 = "consumer-group-1";
    private static final int numTopics = 2;
    private static final int numPartitions = 2;
    private String baseUrl;
    private String clusterId;
    private final List<BinaryPartitionProduceRequest.BinaryPartitionProduceRecord> partitionRecordsWithoutKeys = Arrays.asList(new BinaryPartitionProduceRequest.BinaryPartitionProduceRecord((String) null, "value"), new BinaryPartitionProduceRequest.BinaryPartitionProduceRecord((String) null, "value2"), new BinaryPartitionProduceRequest.BinaryPartitionProduceRecord((String) null, "value3"));

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.baseUrl = this.restConnect;
        this.clusterId = getClusterId();
        createTopic(topic1, 2, (short) 1);
        createTopic(topic2, 2, (short) 1);
    }

    @Test
    public void listConsumerLags_returnsConsumerLags() {
        KafkaConsumer<?, ?> createConsumer = createConsumer(group1, "client-1");
        KafkaConsumer<?, ?> createConsumer2 = createConsumer(group1, "client-2");
        createConsumer.subscribe(Collections.singletonList(topic1));
        createConsumer2.subscribe(Collections.singletonList(topic2));
        createConsumer.poll(Duration.ofSeconds(1L));
        createConsumer2.poll(Duration.ofSeconds(1L));
        createConsumer.poll(Duration.ofSeconds(1L));
        createConsumer2.poll(Duration.ofSeconds(1L));
        BinaryPartitionProduceRequest create = BinaryPartitionProduceRequest.create(this.partitionRecordsWithoutKeys);
        produce(topic1, 0, create);
        produce(topic2, 1, create);
        createConsumer.poll(Duration.ofSeconds(1L));
        createConsumer2.poll(Duration.ofSeconds(1L));
        createConsumer.commitSync();
        createConsumer2.commitSync();
        long[][] jArr = new long[2][2];
        jArr[0][0] = 3;
        jArr[1][1] = 3;
        Response response = request("/v3/clusters/" + this.clusterId + "/consumer-groups/" + group1 + "/lags").accept(new String[]{"application/json"}).get();
        Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
        ConsumerLagDataList value = ((ListConsumerLagsResponse) response.readEntity(ListConsumerLagsResponse.class)).getValue();
        for (int i = 0; i < 2; i++) {
            for (int i2 = 0; i2 < 2; i2++) {
                int i3 = i;
                int i4 = i2;
                ConsumerLagData consumerLagData = (ConsumerLagData) value.getData().stream().filter(consumerLagData2 -> {
                    return consumerLagData2.getTopicName().equals(topics[i3]);
                }).filter(consumerLagData3 -> {
                    return consumerLagData3.getPartitionId() == i4;
                }).findAny().get();
                Assert.assertEquals(jArr[i][i2], consumerLagData.getCurrentOffset().longValue());
                Assert.assertEquals(jArr[i][i2], consumerLagData.getLogEndOffset().longValue());
                Assert.assertEquals(0L, consumerLagData.getLag().longValue());
            }
        }
        produce(topic2, 1, BinaryPartitionProduceRequest.create(this.partitionRecordsWithoutKeys));
        Assert.assertEquals(ListConsumerLagsResponse.create(ConsumerLagDataList.builder().setMetadata(ResourceCollection.Metadata.builder().setSelf(this.baseUrl + "/v3/clusters/" + this.clusterId + "/consumer-groups/" + group1 + "/lags").build()).setData(Arrays.asList(expectedConsumerLagData(topic2, 1, createConsumer2.groupMetadata().memberId(), "client-2", 3L, 6L), expectedConsumerLagData(topic1, 0, createConsumer.groupMetadata().memberId(), "client-1", 3L, 3L), expectedConsumerLagData(topic1, 1, createConsumer.groupMetadata().memberId(), "client-1", 0L, 0L), expectedConsumerLagData(topic2, 0, createConsumer2.groupMetadata().memberId(), "client-2", 0L, 0L))).build()), request("/v3/clusters/" + this.clusterId + "/consumer-groups/" + group1 + "/lags").accept(new String[]{"application/json"}).get().readEntity(ListConsumerLagsResponse.class));
    }

    @Test
    public void listConsumerLags_nonExistingConsumerGroup_returnsNotFound() {
        KafkaConsumer<?, ?> createConsumer = createConsumer(group1, "client-1");
        createConsumer.subscribe(Collections.singletonList(topic1));
        produce(topic1, 0, BinaryPartitionProduceRequest.create(this.partitionRecordsWithoutKeys));
        createConsumer.poll(Duration.ofSeconds(1L));
        createConsumer.commitSync();
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), request("/v3/clusters/" + this.clusterId + "/consumer-groups/foo/lags").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void getConsumerLag_returnsConsumerLag() {
        KafkaConsumer<?, ?> createConsumer = createConsumer(group1, "client-1");
        createConsumer.subscribe(Arrays.asList(topic1, topic2));
        BinaryPartitionProduceRequest create = BinaryPartitionProduceRequest.create(this.partitionRecordsWithoutKeys);
        produce(topic1, 0, create);
        produce(topic2, 1, create);
        createConsumer.poll(Duration.ofSeconds(1L));
        createConsumer.commitSync();
        long[][] jArr = new long[2][2];
        jArr[0][0] = 3;
        jArr[1][1] = 3;
        for (int i = 0; i < 2; i++) {
            for (int i2 = 0; i2 < 2; i2++) {
                createConsumer.seekToEnd(Collections.singletonList(new TopicPartition(topics[i], i2)));
                Response response = request("/v3/clusters/" + this.clusterId + "/consumer-groups/" + group1 + "/lags/" + topics[i] + "/partitions/" + i2).accept(new String[]{"application/json"}).get();
                Assert.assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());
                ConsumerLagData value = ((GetConsumerLagResponse) response.readEntity(GetConsumerLagResponse.class)).getValue();
                Assert.assertEquals(jArr[i][i2], value.getCurrentOffset().longValue());
                Assert.assertEquals(jArr[i][i2], value.getLogEndOffset().longValue());
                Assert.assertEquals(0L, value.getLag().longValue());
            }
        }
        produce(topic2, 1, BinaryPartitionProduceRequest.create(this.partitionRecordsWithoutKeys));
        Assert.assertEquals(GetConsumerLagResponse.create(ConsumerLagData.builder().setMetadata(Resource.Metadata.builder().setSelf(this.baseUrl + "/v3/clusters/" + this.clusterId + "/consumer-groups/" + group1 + "/lags/" + topic2 + "/partitions/1").setResourceName("crn:///kafka=" + this.clusterId + "/consumer-group=" + group1 + "/lag=" + topic2 + "/partition=1").build()).setClusterId(this.clusterId).setConsumerGroupId(group1).setTopicName(topic2).setPartitionId(1).setConsumerId(createConsumer.groupMetadata().memberId()).setClientId("client-1").setCurrentOffset(3L).setLogEndOffset(6L).setLag(3L).build()), request("/v3/clusters/" + this.clusterId + "/consumer-groups/" + group1 + "/lags/" + topic2 + "/partitions/1").accept(new String[]{"application/json"}).get().readEntity(GetConsumerLagResponse.class));
    }

    @Test
    public void getConsumerLag_nonExistingOffsets_returnsNotFound() {
        createConsumer(group1, "client-1").subscribe(Collections.singletonList(topic1));
        BinaryPartitionProduceRequest create = BinaryPartitionProduceRequest.create(this.partitionRecordsWithoutKeys);
        produce(topic1, 0, create);
        produce(topic2, 1, create);
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), request("/v3/clusters/" + this.clusterId + "/consumer-groups/" + group1 + "/lags/" + topic1 + "/partitions/0").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void getConsumerLag_nonExistingConsumerGroup_returnsNotFound() {
        KafkaConsumer<?, ?> createConsumer = createConsumer(group1, "client-1");
        createConsumer.subscribe(Collections.singletonList(topic1));
        produce(topic1, 0, BinaryPartitionProduceRequest.create(this.partitionRecordsWithoutKeys));
        createConsumer.poll(Duration.ofSeconds(1L));
        createConsumer.commitSync();
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), request("/v3/clusters/" + this.clusterId + "/consumer-groups/foo/lags/" + topic1 + "/partitions/0").accept(new String[]{"application/json"}).get().getStatus());
    }

    @Test
    public void getConsumerLag_nonExistingCluster_returnsNotFound() {
        KafkaConsumer<?, ?> createConsumer = createConsumer(group1, "client-1");
        createConsumer.subscribe(Collections.singletonList(topic1));
        produce(topic1, 0, BinaryPartitionProduceRequest.create(this.partitionRecordsWithoutKeys));
        createConsumer.poll(Duration.ofSeconds(1L));
        createConsumer.commitSync();
        Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), request("/v3/clusters/foo/consumer-groups/consumer-group-1/lags/topic-1/partitions/0").accept(new String[]{"application/json"}).get().getStatus());
    }

    private KafkaConsumer<?, ?> createConsumer(String str, String str2) {
        Properties consumerProperties = this.restConfig.getConsumerProperties();
        consumerProperties.put("bootstrap.servers", this.brokerList);
        consumerProperties.put("group.id", str);
        consumerProperties.put("client.id", str2);
        consumerProperties.put("enable.auto.commit", false);
        return new KafkaConsumer<>(consumerProperties, new BytesDeserializer(), new BytesDeserializer());
    }

    private void produce(String str, int i, BinaryPartitionProduceRequest binaryPartitionProduceRequest) {
        request("topics/" + str + "/partitions/" + i, Collections.emptyMap()).post(Entity.entity(binaryPartitionProduceRequest, "application/vnd.kafka.binary.v2+json"));
    }

    private ConsumerLagData expectedConsumerLagData(String str, int i, String str2, String str3, long j, long j2) {
        return ConsumerLagData.builder().setMetadata(Resource.Metadata.builder().setSelf(this.baseUrl + "/v3/clusters/" + this.clusterId + "/consumer-groups/" + group1 + "/lags/" + str + "/partitions/" + i).setResourceName("crn:///kafka=" + this.clusterId + "/consumer-group=" + group1 + "/lag=" + str + "/partition=" + i).build()).setClusterId(this.clusterId).setConsumerGroupId(group1).setTopicName(str).setPartitionId(i).setConsumerId(str2).setClientId(str3).setCurrentOffset(Long.valueOf(j)).setLogEndOffset(Long.valueOf(j2)).setLag(Long.valueOf(j2 - j)).build();
    }
}
