package io.confluent.parallelconsumer.examples.metrics;

import io.confluent.csid.utils.LongPollingMockConsumer;
import java.io.BufferedInputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import pl.tlinkowski.unij.api.UniLists;

@Testcontainers
/* loaded from: input_file:io/confluent/parallelconsumer/examples/metrics/CoreAppMetricsIntegrationTest.class */
public class CoreAppMetricsIntegrationTest {
    private static final Logger log = LoggerFactory.getLogger(CoreAppMetricsIntegrationTest.class);

    @Container
    private static final PrometheusContainer PROMETHEUS_CONTAINER = new PrometheusContainer();

    /* loaded from: input_file:io/confluent/parallelconsumer/examples/metrics/CoreAppMetricsIntegrationTest$CoreAppUnderTest.class */
    class CoreAppUnderTest extends CoreApp {
        LongPollingMockConsumer<String, String> mockConsumer = (LongPollingMockConsumer) Mockito.spy(new LongPollingMockConsumer(OffsetResetStrategy.EARLIEST));

        CoreAppUnderTest() {
        }

        Consumer<String, String> getKafkaConsumer() {
            Mockito.when(this.mockConsumer.groupMetadata()).thenReturn(new ConsumerGroupMetadata("groupid"));
            return this.mockConsumer;
        }

        protected void postSetup() {
            super.postSetup();
            this.mockConsumer.subscribeWithRebalanceAndAssignment(UniLists.of(this.inputTopic), 1);
        }
    }

    @Test
    void testMetrics() {
        org.testcontainers.Testcontainers.exposeHostPorts(new int[]{7001});
        CoreAppUnderTest coreAppUnderTest = new CoreAppUnderTest();
        List of = UniLists.of("pc_status", "pc_partitions_number", "pc_incomplete_offsets_total", "pc_user_function_processing_time_seconds");
        coreAppUnderTest.run();
        coreAppUnderTest.mockConsumer.addRecord(new ConsumerRecord(coreAppUnderTest.inputTopic, 0, 0L, "a key 1", "a value"));
        coreAppUnderTest.mockConsumer.addRecord(new ConsumerRecord(coreAppUnderTest.inputTopic, 0, 1L, "a key 2", "a value"));
        coreAppUnderTest.mockConsumer.addRecord(new ConsumerRecord(coreAppUnderTest.inputTopic, 0, 2L, "a key 3", "a value"));
        Awaitility.await().pollDelay(Duration.ofSeconds(1L)).untilAsserted(() -> {
            Assert.assertTrue(getPrometheusMetrics().containsAll(of));
        });
        coreAppUnderTest.close();
    }

    private Set<String> getPrometheusMetrics() {
        ObjectMapper objectMapper = new ObjectMapper();
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(String.format("%s/api/v1/metadata", PROMETHEUS_CONTAINER.getPrometheusEndpoint())).openConnection();
        Assert.assertEquals(httpURLConnection.getResponseCode(), 200L);
        return ((Map) ((Map) objectMapper.readValue(new BufferedInputStream(httpURLConnection.getInputStream()), Map.class)).get("data")).keySet();
    }
}
