package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.util.ContainerImageVersions;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresShutdownIT.class */
public class PostgresShutdownIT extends AbstractConnectorTest {
    private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);";
    private static final String CREATE_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));CREATE TABLE s1.heartbeat (ts TIMESTAMP WITH TIME ZONE PRIMARY KEY);INSERT INTO s1.heartbeat (ts) VALUES (NOW());";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));CREATE TABLE s1.heartbeat (ts TIMESTAMP WITH TIME ZONE PRIMARY KEY);INSERT INTO s1.heartbeat (ts) VALUES (NOW());INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);";
    private String oldContainerPort;
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgresShutdownIT.class);
    private static final String POSTGRES_IMAGE = ContainerImageVersions.getStableImage("debezium/example-postgres");
    private static final DockerImageName POSTGRES_DOCKER_IMAGE_NAME = DockerImageName.parse(POSTGRES_IMAGE).asCompatibleSubstituteFor("postgres");
    public static PostgreSQLContainer<?> postgresContainer = new PostgreSQLContainer(POSTGRES_DOCKER_IMAGE_NAME).withDatabaseName("postgres").withUsername("postgres").withPassword("postgres").withLogConsumer(new Slf4jLogConsumer(LOGGER)).withNetworkAliases(new String[]{"postgres"});

    @Before
    public void setUp() {
        postgresContainer.start();
        this.oldContainerPort = System.getProperty("database.port", "5432");
        System.setProperty("database.port", String.valueOf(postgresContainer.getMappedPort(5432)));
        try {
            TestHelper.dropAllSchemas();
            initializeConnectorTestFramework();
        } catch (SQLException e) {
            throw new RuntimeException(e);
        }
    }

    @After
    public void tearDown() {
        stopConnector();
        postgresContainer.stop();
        System.setProperty("database.port", this.oldContainerPort);
    }

    @Test
    @FixFor({"DBZ-2617"})
    public void shouldStopOnPostgresFastShutdown() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        for (int i = 0; i < 99; i++) {
            TestHelper.execute(INSERT_STMT, new String[0]);
        }
        Configuration.Builder with = TestHelper.defaultConfig().with("database." + JdbcConfiguration.PORT, postgresContainer.getMappedPort(5432)).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.ALWAYS.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(Heartbeat.HEARTBEAT_INTERVAL, 500).with(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY, "UPDATE s1.heartbeat SET ts=NOW();");
        Testing.Print.enable();
        PostgresConnection create = TestHelper.create();
        String str = (String) create.queryAndMap("SELECT ts FROM s1.heartbeat;", create.singleResultMapper(resultSet -> {
            return resultSet.getString("ts");
        }, "Could not fetch keepalive info"));
        start(PostgresConnector.class, with.build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("postgres", "test_server");
        waitForStreamingRunning("postgres", "test_server");
        this.logger.info("Waiting for heartbeats...");
        Awaitility.await().pollInterval(250L, TimeUnit.MILLISECONDS).atMost(5 * TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!str.equals(create.queryAndMap("SELECT ts FROM s1.heartbeat;", create.singleResultMapper(resultSet2 -> {
                return resultSet2.getString("ts");
            }, "Could not fetch keepalive info"))));
        });
        this.logger.info("INTIAL Heartbeat: " + str + " ; CURRENT heartbeat: " + ((String) create.queryAndMap("SELECT ts FROM s1.heartbeat;", create.singleResultMapper(resultSet2 -> {
            return resultSet2.getString("ts");
        }, "Could not fetch keepalive info"))));
        this.logger.info("Execute Postgres shutdown...");
        this.logger.info(postgresContainer.execInContainer(new String[]{"su", "-", "postgres", "-c", "/usr/lib/postgresql/11/bin/pg_ctl -m fast -D /var/lib/postgresql/data stop"}).toString());
        this.logger.info("Waiting for Postgres to shut down...");
        waitForPostgresShutdown();
        this.logger.info("Waiting for connector to shut down...");
        waitForConnectorShutdown("postgres", "test_server");
    }

    private void waitForPostgresShutdown() {
        Awaitility.await().pollInterval(200L, TimeUnit.MILLISECONDS).atMost(60 * TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(!postgresContainer.isRunning());
        });
    }
}
