package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.processors.AbstractReselectProcessorTest;
import io.debezium.processors.reselect.ReselectColumnsPostProcessor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresReselectColumnsProcessorIT.class */
public class PostgresReselectColumnsProcessorIT extends AbstractReselectProcessorTest<PostgresConnector> {
    public static final String CREATE_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; ";
    private PostgresConnection connection;

    @Before
    public void beforeEach() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute(CREATE_STMT, new String[0]);
        this.connection = TestHelper.create();
        super.beforeEach();
    }

    @After
    public void afterEach() throws Exception {
        super.afterEach();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    protected Class<PostgresConnector> getConnectorClass() {
        return PostgresConnector.class;
    }

    protected JdbcConnection databaseConnection() {
        return this.connection;
    }

    protected Configuration.Builder getConfigurationBuilder() {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz4321").with(PostgresConnectorConfig.CUSTOM_POST_PROCESSORS, "reselector").with("reselector.type", ReselectColumnsPostProcessor.class.getName());
    }

    protected String topicName() {
        return "test_server.s1.dbz4321";
    }

    protected String tableName() {
        return "s1.dbz4321";
    }

    protected String reselectColumnsList() {
        return "s1.dbz4321:data";
    }

    protected void createTable() throws Exception {
        TestHelper.execute("CREATE TABLE s1.dbz4321 (id int primary key, data varchar(50), data2 int);", new String[0]);
        TestHelper.execute("ALTER TABLE s1.dbz4321 REPLICA IDENTITY FULL;", new String[0]);
    }

    protected void dropTable() throws Exception {
    }

    protected String getInsertWithValue() {
        return "INSERT INTO s1.dbz4321 (id,data,data2) values (1,'one',1);";
    }

    protected String getInsertWithNullValue() {
        return "INSERT INTO s1.dbz4321 (id,data,data2) values (1,null,1);";
    }

    protected void waitForStreamingStarted() throws InterruptedException {
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
    }

    @Test
    @FixFor({"DBZ-4321"})
    public void testToastColumnReselectedWhenValueIsUnavailable() throws Exception {
        TestHelper.execute("CREATE TABLE s1.dbz4321_toast (id int primary key, data text, data2 int);", new String[0]);
        LogInterceptor reselectLogInterceptor = getReselectLogInterceptor();
        start(PostgresConnector.class, getConfigurationBuilder().with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz4321_toast").build());
        waitForStreamingStarted();
        String randomAlphabetic = RandomStringUtils.randomAlphabetic(10000);
        TestHelper.execute("INSERT INTO s1.dbz4321_toast (id,data,data2) values (1,'" + randomAlphabetic + "',1);", "UPDATE s1.dbz4321_toast SET data2 = 2 where id = 1;");
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("test_server.s1.dbz4321_toast");
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        VerifyRecord.isValidInsert(sourceRecord, "id", 1);
        Assertions.assertThat(struct.get("id")).isEqualTo(1);
        Assertions.assertThat(struct.get("data")).isEqualTo(randomAlphabetic);
        Assertions.assertThat(struct.get("data2")).isEqualTo(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
        VerifyRecord.isValidUpdate(sourceRecord2, "id", 1);
        Assertions.assertThat(struct2.get("id")).isEqualTo(1);
        Assertions.assertThat(struct2.get("data")).isEqualTo(randomAlphabetic);
        Assertions.assertThat(struct2.get("data2")).isEqualTo(2);
        assertColumnReselectedForUnavailableValue(reselectLogInterceptor, "s1.dbz4321_toast", "data");
    }

    @Test
    @FixFor({"DBZ-7596"})
    public void testToastColumnHstoreAsMapReselectedWhenValueIsUnavailable() throws Exception {
        TestHelper.execute("CREATE TABLE s1.dbz7596_toast (id int primary key, data hstore, data2 int);", new String[0]);
        LogInterceptor reselectLogInterceptor = getReselectLogInterceptor();
        String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(8192);
        start(PostgresConnector.class, getConfigurationBuilder().with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz7596_toast").with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.MAP.getValue()).build());
        waitForStreamingStarted();
        TestHelper.execute("INSERT INTO s1.dbz7596_toast (id,data,data2) values (1,'\"key\"=>\"" + randomAlphanumeric + "\"', 1);", "UPDATE s1.dbz7596_toast SET data2 = 2 where id = 1;");
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("test_server.s1.dbz7596_toast");
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        VerifyRecord.isValidInsert(sourceRecord, "id", 1);
        Assertions.assertThat(struct.get("id")).isEqualTo(1);
        Assertions.assertThat(struct.get("data")).isEqualTo(Map.of("key", randomAlphanumeric));
        Assertions.assertThat(struct.get("data2")).isEqualTo(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
        VerifyRecord.isValidUpdate(sourceRecord2, "id", 1);
        Assertions.assertThat(struct2.get("id")).isEqualTo(1);
        Assertions.assertThat(struct2.get("data")).isEqualTo(Map.of("key", randomAlphanumeric));
        Assertions.assertThat(struct2.get("data2")).isEqualTo(2);
        assertColumnReselectedForUnavailableValue(reselectLogInterceptor, "s1.dbz7596_toast", "data");
    }

    @Test
    @FixFor({"DBZ-7596"})
    public void testToastColumnHstoreAsJsonReselectedWhenValueIsUnavailable() throws Exception {
        TestHelper.execute("CREATE TABLE s1.dbz7596_toast (id int primary key, data hstore, data2 int);", new String[0]);
        LogInterceptor reselectLogInterceptor = getReselectLogInterceptor();
        String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(8192);
        String str = "{\"key\":\"" + randomAlphanumeric + "\"}";
        start(PostgresConnector.class, getConfigurationBuilder().with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz7596_toast").with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.JSON.getValue()).build());
        waitForStreamingStarted();
        TestHelper.execute("INSERT INTO s1.dbz7596_toast (id,data,data2) values (1,'\"key\"=>\"" + randomAlphanumeric + "\"', 1);", "UPDATE s1.dbz7596_toast SET data2 = 2 where id = 1;");
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("test_server.s1.dbz7596_toast");
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        VerifyRecord.isValidInsert(sourceRecord, "id", 1);
        Assertions.assertThat(struct.get("id")).isEqualTo(1);
        Assertions.assertThat(struct.get("data")).isEqualTo(str);
        Assertions.assertThat(struct.get("data2")).isEqualTo(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
        VerifyRecord.isValidUpdate(sourceRecord2, "id", 1);
        Assertions.assertThat(struct2.get("id")).isEqualTo(1);
        Assertions.assertThat(struct2.get("data")).isEqualTo(str);
        Assertions.assertThat(struct2.get("data2")).isEqualTo(2);
        assertColumnReselectedForUnavailableValue(reselectLogInterceptor, "s1.dbz7596_toast", "data");
    }

    @Test
    @FixFor({"DBZ-7596"})
    public void testToastColumnArrayReselectedWhenValueIsUnavailable() throws Exception {
        TestHelper.execute("CREATE TABLE s1.dbz7596_toast (id int primary key, data text[], data2 int);", new String[0]);
        LogInterceptor reselectLogInterceptor = getReselectLogInterceptor();
        ArrayList arrayList = new ArrayList();
        arrayList.add(RandomStringUtils.randomAlphanumeric(8192));
        arrayList.add(RandomStringUtils.randomAlphanumeric(8192));
        String str = (String) arrayList.stream().map(str2 -> {
            return "\"" + str2 + "\"";
        }).collect(Collectors.joining(", "));
        start(PostgresConnector.class, getConfigurationBuilder().with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz7596_toast").with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, "true").build());
        waitForStreamingStarted();
        TestHelper.execute("INSERT INTO s1.dbz7596_toast (id,data,data2) values (1,'{" + str + "}', 1);", "UPDATE s1.dbz7596_toast SET data2 = 2 where id = 1;");
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic("test_server.s1.dbz7596_toast");
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("after");
        VerifyRecord.isValidInsert(sourceRecord, "id", 1);
        Assertions.assertThat(struct.get("id")).isEqualTo(1);
        Assertions.assertThat(struct.get("data")).isEqualTo(arrayList);
        Assertions.assertThat(struct.get("data2")).isEqualTo(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic.get(1);
        Struct struct2 = ((Struct) sourceRecord2.value()).getStruct("after");
        VerifyRecord.isValidUpdate(sourceRecord2, "id", 1);
        Assertions.assertThat(struct2.get("id")).isEqualTo(1);
        Assertions.assertThat(struct2.get("data")).isEqualTo(arrayList);
        Assertions.assertThat(struct2.get("data2")).isEqualTo(2);
        assertColumnReselectedForUnavailableValue(reselectLogInterceptor, "s1.dbz7596_toast", "data");
    }
}
