package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresDefaultValueConverter;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresDefaultValueConverterIT.class */
public class PostgresDefaultValueConverterIT extends AbstractConnectorTest {
    @Before
    public void before() throws SQLException {
        initializeConnectorTestFramework();
        TestHelper.dropAllSchemas();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @Test
    @FixFor({"DBZ-4736", "DBZ-5384"})
    public void shouldSetTheNullValueInSnapshot() throws Exception {
        createTableAndInsertData();
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("postgres", TestHelper.TEST_SERVER);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("test_server.s1.a")).hasSize(1);
        assertDefaultValueChangeRecord((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
    }

    @Test
    @FixFor({"DBZ-4736", "DBZ-5384"})
    public void shouldSetTheNullValueInStreaming() throws Exception {
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("postgres", TestHelper.TEST_SERVER);
        createTableAndInsertData();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic("test_server.s1.a")).hasSize(1);
        assertDefaultValueChangeRecord((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0));
    }

    @Test
    @FixFor({"DBZ-5340"})
    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 13, reason = "gen_random_uuid() available on PG13+ without explicitly using pgcrypto")
    public void testShouldHandleDefaultValueFunctionsWithSchemaPrefixes() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(PostgresDefaultValueConverter.class);
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1;CREATE SCHEMA s2;CREATE OR REPLACE FUNCTION s2.tst_generate_random_uuid() returns uuid as 'select gen_random_uuid()' language sql;CREATE TABLE s1.dbz5340 (id uuid default s2.tst_generate_random_uuid() not null, data text);ALTER TABLE s1.dbz5340 REPLICA IDENTITY FULL;", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().build());
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        Assertions.assertThat(logInterceptor.containsMessage("Cannot parse column default value 's2.tst_generate_random_uuid()' to type 'uuid'")).isFalse();
        TestHelper.execute("INSERT INTO s1.dbz5340 (data) values ('test');", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("s1.dbz5340"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Struct struct = ((Struct) ((SourceRecord) recordsForTopic.get(0)).value()).getStruct("after");
        Assertions.assertThat(struct.get("id")).isNotNull();
        Assertions.assertThat(struct.get("data")).isEqualTo("test");
        Assertions.assertThat(struct.schema().field("id").schema().defaultValue()).isEqualTo("00000000-0000-0000-0000-000000000000");
    }

    private void createTableAndInsertData() {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, dint integer DEFAULT NULL::integer, dvc1 varchar(64) DEFAULT NULL::character varying, dvc2 varchar(64) DEFAULT 'NULL', dvc3 varchar(64) DEFAULT 'MYVALUE', dvc4 varchar(64) DEFAULT 'NULL'::character varying, dvc5 varchar(64) DEFAULT 'NULL::character varying', dvc6 varchar(64) DEFAULT NULL, dt1 timestamp DEFAULT CURRENT_TIMESTAMP, dt2 date DEFAULT CURRENT_DATE, dt3 time DEFAULT CURRENT_TIME, PRIMARY KEY(pk));", new String[0]);
        TestHelper.execute("INSERT INTO s1.a (pk) VALUES (1);", new String[0]);
    }

    private void assertDefaultValueChangeRecord(SourceRecord sourceRecord) {
        Schema valueSchema = sourceRecord.valueSchema();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dint")).isNull();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc1")).isNull();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc2")).isEqualTo("NULL");
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc3")).isEqualTo("MYVALUE");
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc4")).isEqualTo("NULL");
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc5")).isEqualTo("NULL::character varying");
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("dvc6")).isNull();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt1")).isNotNull();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt32("dt2")).isNotNull();
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getInt64("dt3")).isNotNull();
        Assertions.assertThat(valueSchema.field("after").schema().field("dint").schema().defaultValue()).isNull();
        Assertions.assertThat(valueSchema.field("after").schema().field("dvc1").schema().defaultValue()).isNull();
        Assertions.assertThat(valueSchema.field("after").schema().field("dvc2").schema().defaultValue()).isEqualTo("NULL");
        Assertions.assertThat(valueSchema.field("after").schema().field("dvc3").schema().defaultValue()).isEqualTo("MYVALUE");
        Assertions.assertThat(valueSchema.field("after").schema().field("dvc4").schema().defaultValue()).isEqualTo("NULL");
        Assertions.assertThat(valueSchema.field("after").schema().field("dvc5").schema().defaultValue()).isEqualTo("NULL::character varying");
        Assertions.assertThat(valueSchema.field("after").schema().field("dvc6").schema().defaultValue()).isNull();
        Assertions.assertThat(valueSchema.field("after").schema().field("dt1").schema().defaultValue()).isEqualTo(0L);
        Assertions.assertThat(valueSchema.field("after").schema().field("dt2").schema().defaultValue()).isEqualTo(0);
        Assertions.assertThat(valueSchema.field("after").schema().field("dt3").schema().defaultValue()).isEqualTo(0L);
    }
}
