package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.custom.snapshotter.CustomTestSnapshot;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/connector/postgresql/CustomSnapshotterIT.class */
public class CustomSnapshotterIT extends AbstractAsyncEngineConnectorTest {
    private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);";
    private static final String CREATE_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);";

    @Rule
    public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule();

    @BeforeClass
    public static void beforeClass() throws SQLException {
        TestHelper.dropAllSchemas();
    }

    @Before
    public void before() {
        initializeConnectorTestFramework();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @Test
    @FixFor({"DBZ-1082"})
    public void shouldAllowForCustomSnapshot() throws InterruptedException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM).with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2).isNull();
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "pk", 1);
        TestHelper.execute(INSERT_STMT, new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "pk", 2);
        stopConnector();
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE, CommonConnectorConfig.SnapshotQueryMode.CUSTOM).with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(4);
        List recordsForTopic5 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic6 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic5.size()).isEqualTo(2);
        Assertions.assertThat(recordsForTopic6.size()).isEqualTo(2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic5.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic5.get(1), "pk", 2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(1), "pk", 2);
    }

    @Test
    public void shouldAllowStreamOnlyByConfigurationBasedSnapshot() throws InterruptedException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CONFIGURATION_BASED).with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_DATA, false).with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, false).with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM, true).with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).build());
        assertConnectorIsRunning();
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic2.get(0), "pk", 2);
        stopConnector();
    }

    @Test
    public void shouldNotAllowStreamByConfigurationBasedSnapshot() {
        LogInterceptor logInterceptor = new LogInterceptor(ChangeEventSourceCoordinator.class);
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CONFIGURATION_BASED).with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_DATA, false).with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_SNAPSHOT_SCHEMA, false).with(CommonConnectorConfig.SNAPSHOT_MODE_CONFIGURATION_BASED_START_STREAM, false).with(PostgresConnectorConfig.SNAPSHOT_QUERY_MODE_CUSTOM_NAME, CustomTestSnapshot.class.getName()).build());
        assertConnectorIsRunning();
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertNoRecordsToConsume();
        waitForConnectorShutdown("postgres", TestHelper.TEST_SERVER);
        Assertions.assertThat(logInterceptor.containsMessage("Streaming is disabled for snapshot mode configuration_based")).isTrue();
        stopConnector();
    }
}
