package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import java.sql.SQLException;
import java.util.List;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/BlockingSnapshotIT.class */
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {
    private static final String TOPIC_NAME = "test_server.s1.a";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1;CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.b (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048))";

    @Before
    public void before() throws SQLException {
        TestHelper.dropAllSchemas();
        TestHelper.dropDefaultReplicationSlot();
        initializeConnectorTestFramework();
        TestHelper.createDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        TestHelper.createPublicationForAllTables();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    protected Configuration.Builder config() {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal").with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source").with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5);
    }

    protected Configuration.Builder mutableConfig(boolean z, boolean z2) {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal").with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5).with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(PostgresConnectorConfig.SNAPSHOT_MODE_TABLES, "s1.a").with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, true);
    }

    protected Class<PostgresConnector> connectorClass() {
        return PostgresConnector.class;
    }

    protected JdbcConnection databaseConnection() {
        return TestHelper.create();
    }

    protected String topicName() {
        return TOPIC_NAME;
    }

    public List<String> topicNames() {
        return List.of(TOPIC_NAME, "test_server.s1.b");
    }

    protected String tableName() {
        return "s1.a";
    }

    protected List<String> tableNames() {
        return List.of("s1.a", "s1.b");
    }

    protected String signalTableName() {
        return "s1.debezium_signal";
    }

    protected String connector() {
        return "postgres";
    }

    protected String server() {
        return TestHelper.TEST_SERVER;
    }

    @Test
    @FixFor({"DBZ-7311"})
    public void executeBlockingSnapshotWhenSnapshotModeIsNever() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
        populateTable();
        startConnector();
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", AbstractSnapshotSignal.SnapshotType.BLOCKING, new String[]{tableDataCollectionId()});
        waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        assertRecordsFromSnapshotAndStreamingArePresent(1000, consumeRecordsByTopic(1000 + 1, 10));
        insertRecords(1000, 2000);
        assertStreamingRecordsArePresent(1000, consumeRecordsByTopic(1000, 10));
    }

    @Test
    @FixFor({"DBZ-7312"})
    public void executeBlockingSnapshotWhenASnapshotAlreadyExecuted() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
        populateTable();
        startConnectorWithSnapshot(obj -> {
            return mutableConfig(true, true).with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "not exist").with(PostgresConnectorConfig.SLOT_NAME, "snapshot_mode_initial_crash4");
        });
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", AbstractSnapshotSignal.SnapshotType.BLOCKING, new String[]{tableDataCollectionId()});
        waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        assertRecordsFromSnapshotAndStreamingArePresent(1000, consumeRecordsByTopic(1000 + 1, 10));
        insertRecords(1000, 2000);
        assertStreamingRecordsArePresent(1000, consumeRecordsByTopic(1000, 10));
    }
}
