package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Testing;
import java.sql.SQLException;
import java.util.Map;
import org.fest.assertions.Assertions;
import org.fest.assertions.MapAssert;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/IncrementalSnapshotIT.class */
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<PostgresConnector> {
    private static final String TOPIC_NAME = "test_server.s1.a";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.a4 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer, PRIMARY KEY(pk1, pk2, pk3, pk4));CREATE TABLE s1.a42 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer);CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));";

    @Before
    public void before() throws SQLException {
        TestHelper.dropAllSchemas();
        initializeConnectorTestFramework();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    protected Configuration.Builder config() {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal").with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false).with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4");
    }

    protected Class<PostgresConnector> connectorClass() {
        return PostgresConnector.class;
    }

    protected JdbcConnection databaseConnection() {
        return TestHelper.create();
    }

    protected String topicName() {
        return TOPIC_NAME;
    }

    protected String tableName() {
        return "s1.a";
    }

    protected String signalTableName() {
        return "s1.debezium_signal";
    }

    protected void waitForConnectorToStart() {
        super.waitForConnectorToStart();
        TestHelper.waitForDefaultReplicationSlotBeActive();
    }

    @Test
    public void inserts4Pks() throws Exception {
        Testing.Print.enable();
        populate4PkTable();
        startConnector();
        sendAdHocSnapshotSignal(new String[]{"s1.a4"});
        Thread.sleep(5000L);
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                int i2 = i + 1000 + 1;
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)", "s1.a4", Integer.valueOf(i2 / 1000), Integer.valueOf((i2 / 100) % 10), Integer.valueOf((i2 / 10) % 10), Integer.valueOf(i2 % 10), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000, entry -> {
                return true;
            }, struct -> {
                return Integer.valueOf((struct.getInt32("pk1").intValue() * 1000) + (struct.getInt32("pk2").intValue() * 100) + (struct.getInt32("pk3").intValue() * 10) + struct.getInt32("pk4").intValue());
            }, "test_server.s1.a4", null);
            for (int i3 = 0; i3 < 2000; i3++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).includes(new MapAssert.Entry[]{MapAssert.entry(Integer.valueOf(i3 + 1), Integer.valueOf(i3))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void insertsWithoutPks() throws Exception {
        Testing.Print.enable();
        populate4WithoutPkTable();
        startConnector();
        sendAdHocSnapshotSignal(new String[]{"s1.a42"});
        Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
            return true;
        }, struct -> {
            return Integer.valueOf((struct.getInt32("pk1").intValue() * 1000) + (struct.getInt32("pk2").intValue() * 100) + (struct.getInt32("pk3").intValue() * 10) + struct.getInt32("pk4").intValue());
        }, "test_server.s1.a42", null);
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).includes(new MapAssert.Entry[]{MapAssert.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    protected void populate4PkTable() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populate4PkTable(databaseConnection, "s1.a4");
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected void populate4WithoutPkTable() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populate4PkTable(databaseConnection, "s1.a42");
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
