package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.kafka.KafkaCluster;
import io.debezium.pipeline.signal.channels.KafkaSignalChannel;
import io.debezium.pipeline.source.snapshot.incremental.AbstractIncrementalSnapshotTest;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.io.File;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/IncrementalSnapshotIT.class */
public class IncrementalSnapshotIT extends AbstractIncrementalSnapshotTest<PostgresConnector> {
    private static final String TOPIC_NAME = "test_server.s1.a";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.b (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.a4 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer, PRIMARY KEY(pk1, pk2, pk3, pk4));CREATE TABLE s1.a42 (pk1 integer, pk2 integer, pk3 integer, pk4 integer, aa integer);CREATE TABLE s1.anumeric (pk numeric, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.debezium_signal (id varchar(64), type varchar(32), data varchar(2048));ALTER TABLE s1.debezium_signal REPLICA IDENTITY FULL;CREATE TYPE enum_type AS ENUM ('UP', 'DOWN', 'LEFT', 'RIGHT', 'STORY');CREATE TABLE s1.enumpk (pk enum_type, aa integer, PRIMARY KEY(pk));";

    @Before
    public void before() throws SQLException {
        TestHelper.dropAllSchemas();
        initializeConnectorTestFramework();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
    }

    @BeforeClass
    public static void startKafka() throws Exception {
        File createTestingDirectory = Testing.Files.createTestingDirectory("signal_cluster");
        Testing.Files.delete(createTestingDirectory);
        kafka = new KafkaCluster().usingDirectory(createTestingDirectory).deleteDataPriorToStartup(true).deleteDataUponShutdown(true).addBrokers(1).withKafkaConfiguration(Collect.propertiesOf("auto.create.topics.enable", "false", "zookeeper.session.timeout.ms", "20000")).startup();
        kafka.createTopic("signals_topic", 1, 1);
    }

    @AfterClass
    public static void stopKafka() {
        if (kafka != null) {
            kafka.shutdown();
        }
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    protected Configuration.Builder config() {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal").with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source").with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5).with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4").with("database.autosave", "conservative");
    }

    protected Configuration.Builder mutableConfig(boolean z, boolean z2) {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal").with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5).with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(RelationalDatabaseConnectorConfig.MSG_KEY_COLUMNS, "s1.a42:pk1,pk2,pk3,pk4").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, z ? "s1.b" : "s1.a,s1.b").with("database.autosave", "conservative");
    }

    protected Class<PostgresConnector> connectorClass() {
        return PostgresConnector.class;
    }

    protected JdbcConnection databaseConnection() {
        return TestHelper.create();
    }

    protected String topicName() {
        return TOPIC_NAME;
    }

    public List<String> topicNames() {
        return List.of(TOPIC_NAME, "test_server.s1.b");
    }

    protected String tableName() {
        return "s1.a";
    }

    protected List<String> tableNames() {
        return List.of("s1.a", "s1.b");
    }

    protected String signalTableName() {
        return "s1.debezium_signal";
    }

    protected void waitForConnectorToStart() {
        super.waitForConnectorToStart();
        TestHelper.waitForDefaultReplicationSlotBeActive();
    }

    protected String connector() {
        return "postgres";
    }

    protected String server() {
        return TestHelper.TEST_SERVER;
    }

    @Test
    @FixFor({"DBZ-6481"})
    public void insertsEnumPk() throws Exception {
        Testing.Print.enable();
        List of = List.of("UP", "DOWN", "LEFT", "RIGHT", "STORY");
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < of.size(); i++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", "s1.enumpk", databaseConnection.quotedColumnIdString(pkFieldName()), "'" + ((String) of.get(i)) + "'", Integer.valueOf(i))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            startConnector();
            sendAdHocSnapshotSignal(new String[]{"s1.enumpk"});
            List allRecordsInOrder = consumeRecordsByTopic(of.size() + 3).allRecordsInOrder();
            for (int i2 = 0; i2 < of.size(); i2++) {
                SourceRecord sourceRecord = (SourceRecord) allRecordsInOrder.get(i2 + 2);
                Assertions.assertThat(((Struct) sourceRecord.key()).getString("pk")).isEqualTo(of.get(i2));
                Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("aa")).isEqualTo(i2);
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void inserts4Pks() throws Exception {
        populate4PkTable();
        startConnector();
        sendAdHocSnapshotSignal(new String[]{"s1.a4"});
        Thread.sleep(5000L);
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                int i2 = i + 1000 + 1;
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)", "s1.a4", Integer.valueOf(i2 / 1000), Integer.valueOf((i2 / 100) % 10), Integer.valueOf((i2 / 10) % 10), Integer.valueOf(i2 % 10), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000, entry -> {
                return true;
            }, struct -> {
                return Integer.valueOf((struct.getInt32("pk1").intValue() * 1000) + (struct.getInt32("pk2").intValue() * 100) + (struct.getInt32("pk3").intValue() * 10) + struct.getInt32("pk4").intValue());
            }, sourceRecord -> {
                return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
            }, "test_server.s1.a4", null);
            for (int i3 = 0; i3 < 2000; i3++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i3 + 1), Integer.valueOf(i3))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void inserts4PksWithKafkaSignal() throws Exception {
        populate4PkTable();
        startConnector(builder -> {
            return builder.with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "source,kafka").with(KafkaSignalChannel.SIGNAL_TOPIC, getSignalsTopic()).with(KafkaSignalChannel.BOOTSTRAP_SERVERS, kafka.brokerList());
        });
        sendExecuteSnapshotKafkaSignal("s1.a4");
        Thread.sleep(5000L);
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i = 0; i < 1000; i++) {
                int i2 = i + 1000 + 1;
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (pk1, pk2, pk3, pk4, aa) VALUES (%s, %s, %s, %s, %s)", "s1.a4", Integer.valueOf(i2 / 1000), Integer.valueOf((i2 / 100) % 10), Integer.valueOf((i2 / 10) % 10), Integer.valueOf(i2 % 10), Integer.valueOf(i + 1000))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(2000, entry -> {
                return true;
            }, struct -> {
                return Integer.valueOf((struct.getInt32("pk1").intValue() * 1000) + (struct.getInt32("pk2").intValue() * 100) + (struct.getInt32("pk3").intValue() * 10) + struct.getInt32("pk4").intValue());
            }, sourceRecord -> {
                return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
            }, "test_server.s1.a4", null);
            for (int i3 = 0; i3 < 2000; i3++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i3 + 1), Integer.valueOf(i3))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void insertsWithoutPks() throws Exception {
        populate4WithoutPkTable();
        startConnector();
        sendAdHocSnapshotSignal(new String[]{"s1.a42"});
        Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
            return true;
        }, struct -> {
            return Integer.valueOf((struct.getInt32("pk1").intValue() * 1000) + (struct.getInt32("pk2").intValue() * 100) + (struct.getInt32("pk3").intValue() * 10) + struct.getInt32("pk4").intValue());
        }, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
        }, "test_server.s1.a42", null);
        for (int i = 0; i < 1000; i++) {
            Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
        }
    }

    @Test
    public void insertsNumericPk() throws Exception {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populateTable(databaseConnection, "s1.anumeric");
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            startConnector();
            sendAdHocSnapshotSignal(new String[]{"s1.anumeric"});
            Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
                return true;
            }, struct -> {
                return Integer.valueOf(VariableScaleDecimal.toLogical(struct.getStruct("pk")).getWrappedValue().intValue());
            }, sourceRecord -> {
                return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
            }, "test_server.s1.anumeric", null);
            for (int i = 0; i < 1000; i++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-5240"})
    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 11, reason = "Primary keys on partitioned tables are supported only on Postgres 11+")
    public void snapshotPartitionedTable() throws Exception {
        TestHelper.execute("CREATE TABLE s1.part (pk SERIAL, aa integer, PRIMARY KEY(pk, aa)) PARTITION BY RANGE (aa);CREATE TABLE s1.part1 PARTITION OF s1.part FOR VALUES FROM (0) TO (500);CREATE TABLE s1.part2 PARTITION OF s1.part FOR VALUES FROM (500) TO (1000);", new String[0]);
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populateTable(databaseConnection, "s1.part");
            if (databaseConnection != null) {
                databaseConnection.close();
            }
            startConnector(builder -> {
                return builder.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.part, s1.part1, s1.part2");
            });
            waitForConnectorToStart();
            sendAdHocSnapshotSignal(new String[]{"s1.part"});
            sendAdHocSnapshotSignal(new String[]{"s1.part1"});
            sendAdHocSnapshotSignal(new String[]{"s1.part2"});
            Map consumeMixedWithIncrementalSnapshot = consumeMixedWithIncrementalSnapshot(1000, entry -> {
                return true;
            }, struct -> {
                return struct.getInt32("pk");
            }, sourceRecord -> {
                return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
            }, "test_server.s1.part", null);
            Map consumeMixedWithIncrementalSnapshot2 = consumeMixedWithIncrementalSnapshot(500, entry2 -> {
                return true;
            }, struct2 -> {
                return struct2.getInt32("pk");
            }, sourceRecord2 -> {
                return ((Struct) sourceRecord2.value()).getStruct("after").getInt32(valueFieldName());
            }, "test_server.s1.part1", null);
            Map consumeMixedWithIncrementalSnapshot3 = consumeMixedWithIncrementalSnapshot(500, entry3 -> {
                return true;
            }, struct3 -> {
                return struct3.getInt32("pk");
            }, sourceRecord3 -> {
                return ((Struct) sourceRecord3.value()).getStruct("after").getInt32(valueFieldName());
            }, "test_server.s1.part2", null);
            for (int i = 0; i < 1000; i++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i + 1), Integer.valueOf(i))});
            }
            for (int i2 = 0; i2 < 500; i2++) {
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot2).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1), Integer.valueOf(i2))});
                Assertions.assertThat(consumeMixedWithIncrementalSnapshot3).contains(new Map.Entry[]{Assertions.entry(Integer.valueOf(i2 + 1 + 500), Integer.valueOf(i2 + 500))});
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    @FixFor({"DBZ-4329"})
    public void obsoleteSourceInfoIsExcludedFromRecord() throws Exception {
        populateTable();
        startConnector();
        sendAdHocSnapshotSignal();
        Set<Map.Entry> entrySet = consumeMixedWithIncrementalSnapshot(1000, sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("source");
        }, entry -> {
            return true;
        }, null, topicName()).entrySet();
        Assertions.assertThat(1000 == entrySet.size());
        for (Map.Entry entry2 : entrySet) {
            Assert.assertTrue(((Struct) entry2.getValue()).getInt64("xmin") == null);
            Assert.assertTrue(((Struct) entry2.getValue()).getInt64("lsn") == null);
            Assert.assertTrue(((Struct) entry2.getValue()).getInt64("txId") == null);
        }
    }

    protected void populate4PkTable() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populate4PkTable(databaseConnection, "s1.a4");
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected void populate4WithoutPkTable() throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            populate4PkTable(databaseConnection, "s1.a42");
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
