package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import java.sql.SQLException;
import java.util.Base64;
import java.util.List;
import junit.framework.TestCase;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/connector/postgresql/LogicalDecodingMessageIT.class */
public class LogicalDecodingMessageIT extends AbstractConnectorTest {
    private static final String CREATE_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));";

    @Rule
    public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule();

    @BeforeClass
    public static void beforeClass() throws SQLException {
        TestHelper.dropAllSchemas();
    }

    @Before
    public void before() {
        initializeConnectorTestFramework();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Database Version less than 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-2363"})
    public void shouldNotConsumeLogicalDecodingMessagesWhenAllPrefixesAreInTheExcludedList() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.LOGICAL_DECODING_MESSAGE_PREFIX_EXCLUDE_LIST, ".*").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'prefix', 'content');", new String[0]);
        TestHelper.execute("INSERT into s1.a VALUES(201, 1);", new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("message"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Assert.assertNull(recordsForTopic2);
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-2363"})
    public void shouldConsumeNonTransactionalLogicalDecodingMessages() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', 'bar');", new String[0]);
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', E'bar'::bytea);", new String[0]);
        consumeRecordsByTopic(2).recordsForTopic(TestHelper.topicName("message")).forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.value();
            String string = struct.getString("op");
            Struct struct2 = struct.getStruct("source");
            Struct struct3 = struct.getStruct("message");
            Assert.assertNull(struct2.getInt64("txId"));
            Assert.assertNotNull(struct2.getInt64("ts_ms"));
            Assert.assertNotNull(struct2.getInt64("lsn"));
            TestCase.assertEquals("", struct2.getString("table"));
            TestCase.assertEquals("", struct2.getString("schema"));
            TestCase.assertEquals(Envelope.Operation.MESSAGE.code(), string);
            TestCase.assertEquals("foo", struct3.getString("prefix"));
            Assert.assertArrayEquals("bar".getBytes(), struct3.getBytes("content"));
        });
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-2363"})
    public void shouldConsumeTransactionalLogicalDecodingMessages() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        TestHelper.execute("SELECT pg_logical_emit_message(true, 'txn_foo', 'txn_bar');", new String[0]);
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', E'txn_bar'::bytea);", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("message"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        recordsForTopic.forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.value();
            String string = struct.getString("op");
            Struct struct2 = struct.getStruct("source");
            Struct struct3 = struct.getStruct("message");
            Assert.assertNotNull(struct2.getInt64("txId"));
            Assert.assertNotNull(struct2.getInt64("ts_ms"));
            Assert.assertNotNull(struct2.getInt64("lsn"));
            TestCase.assertEquals("", struct2.getString("table"));
            TestCase.assertEquals("", struct2.getString("schema"));
            TestCase.assertEquals(Envelope.Operation.MESSAGE.code(), string);
            TestCase.assertEquals("txn_foo", struct3.getString("prefix"));
            Assert.assertArrayEquals("txn_bar".getBytes(), struct3.getBytes("content"));
        });
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-2363"})
    public void shouldApplyBinaryHandlingMode() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.BINARY_HANDLING_MODE, "base64").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', E'txn_bar'::bytea);", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("message"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Assertions.assertThat(new String(Base64.getEncoder().encode("txn_bar".getBytes("UTF-8")), "UTF-8")).isEqualTo(((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("message").getString("content"));
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Database Version less than 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-2363"})
    public void shouldNotConsumeLogicalDecodingMessagesWithExcludedPrefixes() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.LOGICAL_DECODING_MESSAGE_PREFIX_EXCLUDE_LIST, "excluded_prefix, prefix:excluded").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'included_prefix', 'content');", new String[0]);
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'excluded_prefix', 'content');", new String[0]);
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'prefix:excluded', 'content');", new String[0]);
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'prefix:included', 'content');", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(TestHelper.topicName("message"));
        TestCase.assertEquals(2, recordsForTopic.size());
        Assertions.assertThat(getPrefix((SourceRecord) recordsForTopic.get(0))).isEqualTo("included_prefix");
        Assertions.assertThat(getPrefix((SourceRecord) recordsForTopic.get(1))).isEqualTo("prefix:included");
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Database Version less than 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-2363"})
    public void shouldOnlyConsumeLogicalDecodingMessagesWithIncludedPrefixes() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.LOGICAL_DECODING_MESSAGE_PREFIX_INCLUDE_LIST, "included_prefix, prefix:included, ano.*er_included").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'included_prefix', 'content');", new String[0]);
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'excluded_prefix', 'content');", new String[0]);
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'prefix:excluded', 'content');", new String[0]);
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'prefix:included', 'content');", new String[0]);
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'another_included', 'content');", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(3).recordsForTopic(TestHelper.topicName("message"));
        Assertions.assertThat(recordsForTopic).hasSize(3);
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(0)).key()).getString("prefix")).isEqualTo("included_prefix");
        Assertions.assertThat(getPrefix((SourceRecord) recordsForTopic.get(0))).isEqualTo("included_prefix");
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(1)).key()).getString("prefix")).isEqualTo("prefix:included");
        Assertions.assertThat(getPrefix((SourceRecord) recordsForTopic.get(1))).isEqualTo("prefix:included");
        Assertions.assertThat(((Struct) ((SourceRecord) recordsForTopic.get(2)).key()).getString("prefix")).isEqualTo("another_included");
        Assertions.assertThat(getPrefix((SourceRecord) recordsForTopic.get(2))).isEqualTo("another_included");
    }

    private String getPrefix(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("message").getString("prefix");
    }

    private void waitForSnapshotToBeCompleted() throws InterruptedException {
        waitForSnapshotToBeCompleted("postgres", "test_server");
    }
}
