package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresSkipMessagesWithoutChangeConfigIT.class */
public class PostgresSkipMessagesWithoutChangeConfigIT extends AbstractConnectorTest {
    @Before
    public void before() throws Exception {
        initializeConnectorTestFramework();
        TestHelper.dropAllSchemas();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @Test
    @FixFor({"DBZ-2979"})
    public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabled() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS updates_test CASCADE;", "CREATE SCHEMA updates_test;", "CREATE TABLE updates_test.debezium_test (id int4 NOT NULL, white int, black int, PRIMARY KEY(id));", "ALTER TABLE updates_test.debezium_test REPLICA IDENTITY FULL;");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.COLUMN_INCLUDE_LIST, "updates_test.debezium_test.id, updates_test.debezium_test.white").with(PostgresConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).build());
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        TestHelper.execute("INSERT INTO updates_test.debezium_test (id,white,black) VALUES (1,1,1);", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET black=2 where id = 1;", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET white=2 where id = 1;", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET white=3, black=3 where id = 1;", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(TestHelper.topicName("updates_test.debezium_test"));
        Assertions.assertThat(recordsForTopic).hasSize(3);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after").get("white")).isEqualTo(2);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(2)).value()).getStruct("after").get("white")).isEqualTo(3);
    }

    @Test
    @FixFor({"DBZ-2979"})
    public void shouldSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledWithExcludeConfig() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS updates_test CASCADE;", "CREATE SCHEMA updates_test;", "CREATE TABLE updates_test.debezium_test (id int4 NOT NULL, white int, black int, PRIMARY KEY(id));", "ALTER TABLE updates_test.debezium_test REPLICA IDENTITY FULL;");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.COLUMN_EXCLUDE_LIST, "updates_test.debezium_test.black").with(PostgresConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).build());
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        TestHelper.execute("INSERT INTO updates_test.debezium_test (id,white,black) VALUES (1,1,1);", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET black=2 where id = 1;", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET white=2 where id = 1;", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET white=3, black=3 where id = 1;", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(TestHelper.topicName("updates_test.debezium_test"));
        Assertions.assertThat(recordsForTopic).hasSize(3);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after").get("white")).isEqualTo(2);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(2)).value()).getStruct("after").get("white")).isEqualTo(3);
    }

    @Test
    @FixFor({"DBZ-2979"})
    public void shouldNotSkipEventsWithNoChangeInIncludedColumnsWhenSkipEnabledButTableReplicaIdentityNotFull() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS updates_test CASCADE;", "CREATE SCHEMA updates_test;", "CREATE TABLE updates_test.debezium_test (id int4 NOT NULL, white int, black int, PRIMARY KEY(id));");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.COLUMN_INCLUDE_LIST, "updates_test.debezium_test.id, updates_test.debezium_test.white").with(PostgresConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).build());
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        TestHelper.execute("INSERT INTO updates_test.debezium_test (id,white,black) VALUES (1,1,1);", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET black=2 where id = 1;", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET white=2 where id = 1;", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET white=3, black=3 where id = 1;", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(4).recordsForTopic(TestHelper.topicName("updates_test.debezium_test"));
        Assertions.assertThat(recordsForTopic).hasSize(4);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after").get("white")).isEqualTo(1);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(2)).value()).getStruct("after").get("white")).isEqualTo(2);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(3)).value()).getStruct("after").get("white")).isEqualTo(3);
    }

    @Test
    @FixFor({"DBZ-2979"})
    public void shouldNotSkipEventsWithNoChangeInIncludedColumnsWhenSkipDisabled() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS updates_test CASCADE;", "CREATE SCHEMA updates_test;", "CREATE TABLE updates_test.debezium_test (id int4 NOT NULL, white int, black int, PRIMARY KEY(id));", "ALTER TABLE updates_test.debezium_test REPLICA IDENTITY FULL;");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.COLUMN_INCLUDE_LIST, "updates_test.debezium_test.id, updates_test.debezium_test.white").with(PostgresConnectorConfig.SKIP_MESSAGES_WITHOUT_CHANGE, false).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).build());
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        TestHelper.execute("INSERT INTO updates_test.debezium_test (id,white,black) VALUES (1,1,1);", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET black=2 where id = 1;", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET white=2 where id = 1;", new String[0]);
        TestHelper.execute("UPDATE updates_test.debezium_test SET white=3, black=3 where id = 1;", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(4).recordsForTopic(TestHelper.topicName("updates_test.debezium_test"));
        Assertions.assertThat(recordsForTopic).hasSize(4);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(1)).value()).getStruct("after").get("white")).isEqualTo(1);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(2)).value()).getStruct("after").get("white")).isEqualTo(2);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(3)).value()).getStruct("after").get("white")).isEqualTo(3);
    }
}
