package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenJavaVersion;
import io.debezium.pipeline.AbstractMetricsTest;
import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresMetricsIT.class */
public class PostgresMetricsIT extends AbstractMetricsTest<PostgresConnector> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresMetricsIT.class);
    private static final String INIT_STATEMENTS = "CREATE TABLE simple (pk SERIAL NOT NULL, val INT NOT NULL, PRIMARY KEY(pk)); ALTER TABLE simple REPLICA IDENTITY FULL;";
    private static final String INSERT_STATEMENTS = "INSERT INTO simple (val) VALUES (25); INSERT INTO simple (val) VALUES (50);";

    protected Class<PostgresConnector> getConnectorClass() {
        return PostgresConnector.class;
    }

    protected String connector() {
        return "postgres";
    }

    protected String server() {
        return TestHelper.TEST_SERVER;
    }

    protected Configuration.Builder config() {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.ALWAYS).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
    }

    protected Configuration.Builder noSnapshot(Configuration.Builder builder) {
        return builder.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA);
    }

    protected void executeInsertStatements() {
        TestHelper.execute(INSERT_STATEMENTS, new String[0]);
    }

    protected String tableName() {
        return "public.simple";
    }

    protected long expectedEvents() {
        return 2L;
    }

    protected boolean snapshotCompleted() {
        return false;
    }

    @Before
    public void before() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropAllSchemas();
        TestHelper.execute(INIT_STATEMENTS, new String[0]);
    }

    @After
    public void after() throws Exception {
        stopConnector();
    }

    @Test
    @SkipWhenJavaVersion(check = EqualityCheck.GREATER_THAN_OR_EQUAL, value = 16, description = "Deep reflection not allowed by default on this Java version")
    public void oneRecordInQueue() throws Exception {
        MBeanServer platformMBeanServer = ManagementFactory.getPlatformMBeanServer();
        executeInsertStatements();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(1);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 10).with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1).with(PostgresConnectorConfig.POLL_INTERVAL_MS, 100L).with(PostgresConnectorConfig.MAX_QUEUE_SIZE_IN_BYTES, 10000L).build(), loggingCompletion(), null, sourceRecord -> {
            LOGGER.info("Record '{}' arrived", sourceRecord);
            countDownLatch.countDown();
            try {
                countDownLatch2.await(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS);
                LOGGER.info("Record processing completed");
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }, true);
        waitForStreamingRunning(connector(), server());
        executeInsertStatements();
        LOGGER.info("Waiting for the first record to arrive");
        countDownLatch.await(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS);
        LOGGER.info("First record arrived");
        Awaitility.await().alias("MBean attribute was not an expected value").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            return Boolean.valueOf(((Long) platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).longValue() > 0);
        });
        Assertions.assertThat(platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isNotEqualTo(0L);
        LOGGER.info("Wait for the queue to contain second record");
        Awaitility.await().alias("MBean attribute was not an expected value").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            return Boolean.valueOf(((Integer) platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity")).intValue() == 9);
        });
        LOGGER.info("Wait for second record to be in queue");
        Assertions.assertThat(platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "QueueRemainingCapacity")).isEqualTo(9);
        LOGGER.info("Empty queue");
        countDownLatch2.countDown();
        LOGGER.info("Wait for queue to be empty");
        Awaitility.await().alias("MBean attribute was not an expected value").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            return Boolean.valueOf(((Long) platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).longValue() == 0);
        });
        Assertions.assertThat(platformMBeanServer.getAttribute(getStreamingMetricsObjectName(), "CurrentQueueSizeInBytes")).isEqualTo(0L);
        stopConnector();
    }
}
