package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.data.SchemaAndValueField;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/SnapshotIsolationIT.class */
public class SnapshotIsolationIT extends AbstractAsyncEngineConnectorTest {
    private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (2);INSERT INTO s1.a (aa) VALUES (3);INSERT INTO s2.a (aa) VALUES (4);";
    private static final String CREATE_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL NOT NULL PRIMARY KEY, aa integer);CREATE TABLE s2.a (pk SERIAL NOT NULL PRIMARY KEY, aa integer);";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL NOT NULL PRIMARY KEY, aa integer);CREATE TABLE s2.a (pk SERIAL NOT NULL PRIMARY KEY, aa integer);INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (2);INSERT INTO s1.a (aa) VALUES (3);INSERT INTO s2.a (aa) VALUES (4);";

    @BeforeClass
    public static void beforeClass() throws SQLException {
        TestHelper.dropAllSchemas();
    }

    @Before
    public void before() {
        initializeConnectorTestFramework();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @Test
    public void takeSnapshotInSerializableMode() throws Exception {
        takeSnapshot(PostgresConnectorConfig.SnapshotIsolationMode.SERIALIZABLE);
    }

    @Test
    public void takeSnapshotInRepeatableReadMode() throws Exception {
        takeSnapshot(PostgresConnectorConfig.SnapshotIsolationMode.REPEATABLE_READ);
    }

    @Test
    public void takeSnapshotInReadCommittedMode() throws Exception {
        takeSnapshot(PostgresConnectorConfig.SnapshotIsolationMode.READ_COMMITTED);
    }

    @Test
    public void takeSnapshotInReadUncommittedMode() throws Exception {
        takeSnapshot(PostgresConnectorConfig.SnapshotIsolationMode.READ_UNCOMMITTED);
    }

    private void takeSnapshot(PostgresConnectorConfig.SnapshotIsolationMode snapshotIsolationMode) throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_ISOLATION_MODE.name(), snapshotIsolationMode.getValue()).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        Assertions.assertThat(recordsForTopic).hasSize(2);
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic2).hasSize(2);
        List of = List.of((SourceRecord) recordsForTopic.get(0), (SourceRecord) recordsForTopic2.get(0), (SourceRecord) recordsForTopic.get(1), (SourceRecord) recordsForTopic2.get(1));
        Schema build = SchemaBuilder.int32().defaultValue(0).build();
        int i = 0;
        while (i < 4) {
            SourceRecord sourceRecord = (SourceRecord) of.get(i);
            List<SchemaAndValueField> of2 = List.of(new SchemaAndValueField("pk", build, Integer.valueOf((i / 2) + 1)));
            List<SchemaAndValueField> of3 = List.of(new SchemaAndValueField("pk", build, Integer.valueOf((i / 2) + 1)), new SchemaAndValueField("aa", Schema.OPTIONAL_INT32_SCHEMA, Integer.valueOf(i + 1)));
            Struct struct = (Struct) sourceRecord.key();
            Struct struct2 = (Struct) sourceRecord.value();
            assertRecord(struct, of2);
            assertRecord((Struct) struct2.get("after"), of3);
            Assertions.assertThat(sourceRecord.sourceOffset()).extracting("snapshot").isEqualTo("INITIAL");
            Assertions.assertThat(sourceRecord.sourceOffset()).extracting("last_snapshot_record").isEqualTo(Boolean.valueOf(i == 3));
            Assert.assertNull(struct2.get("before"));
            i++;
        }
    }

    private void assertRecord(Struct struct, List<SchemaAndValueField> list) {
        list.forEach(schemaAndValueField -> {
            schemaAndValueField.assertFor(struct);
        });
    }
}
