package io.debezium.connector.postgresql;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
import io.debezium.connector.postgresql.transforms.DecodeLogicalDecodingMessageContent;
import io.debezium.converters.AbstractCloudEventsConverterTest;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.transforms.outbox.EventRouter;
import java.util.LinkedHashMap;
import java.util.UUID;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.HeaderFrom;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/CloudEventsConverterIT.class */
public class CloudEventsConverterIT extends AbstractCloudEventsConverterTest<PostgresConnector> {
    private static final String SETUP_SCHEMA = "DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1;";
    private static final String SETUP_TABLE = "CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));";
    private static final String SETUP_OUTBOX_SCHEMA = "DROP SCHEMA IF EXISTS outboxsmtit CASCADE;CREATE SCHEMA outboxsmtit;";
    private static final String SETUP_OUTBOX_TABLE = "CREATE TABLE outboxsmtit.outbox (  id            uuid         not null    constraint outbox_pk primary key,  aggregatetype varchar(255) not null,  aggregateid   varchar(255) not null,  event_type    varchar(255) not null,  payload       jsonb);";
    private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);";

    @Before
    public void beforeEach() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
        super.beforeEach();
    }

    protected Class<PostgresConnector> getConnectorClass() {
        return PostgresConnector.class;
    }

    protected String getConnectorName() {
        return "postgresql";
    }

    protected String getServerName() {
        return TestHelper.TEST_SERVER;
    }

    protected JdbcConnection databaseConnection() {
        return TestHelper.create();
    }

    protected Configuration.Builder getConfigurationBuilder() {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NO_DATA).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "outboxsmtit,s1").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "outboxsmtit.outbox,s1.a");
    }

    protected String topicName() {
        return TestHelper.topicName("s1.a");
    }

    protected String topicNameOutbox() {
        return TestHelper.topicName("outboxsmtit.outbox");
    }

    protected String topicNameMessage() {
        return TestHelper.topicName("message");
    }

    protected void createTable() throws Exception {
        TestHelper.execute(SETUP_SCHEMA, new String[0]);
        TestHelper.execute(SETUP_TABLE, new String[0]);
    }

    protected void createOutboxTable() throws Exception {
        TestHelper.execute(SETUP_OUTBOX_SCHEMA, new String[0]);
        TestHelper.execute(SETUP_OUTBOX_TABLE, new String[0]);
    }

    protected String createInsert() {
        return INSERT_STMT;
    }

    protected String createInsertToOutbox(String str, String str2, String str3, String str4, String str5, String str6) {
        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO outboxsmtit.outbox VALUES (");
        sb.append("'").append(UUID.fromString(str)).append("'");
        sb.append(", '").append(str3).append("'");
        sb.append(", '").append(str4).append("'");
        sb.append(", '").append(str2).append("'");
        if (str5 == null) {
            sb.append(", null::jsonb");
        } else if (str5.isEmpty()) {
            sb.append(", ''");
        } else {
            sb.append(", '").append(str5).append("'::jsonb");
        }
        if (str6 != null) {
            sb.append(str6);
        }
        sb.append(")");
        return sb.toString();
    }

    protected String createStatementToCallInsertToWalFunction(String str, String str2, String str3, String str4, String str5) {
        StringBuilder sb = new StringBuilder();
        sb.append("SELECT pg_logical_emit_message(true, 'foo', '");
        ObjectNode createObjectNode = new ObjectMapper().createObjectNode();
        createObjectNode.put("id", str);
        createObjectNode.put("type", str2);
        createObjectNode.put("aggregateType", str3);
        createObjectNode.put("aggregateId", str4);
        createObjectNode.put("payload", str5);
        sb.append(createObjectNode).append("');");
        return sb.toString();
    }

    protected void waitForStreamingStarted() throws InterruptedException {
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-8103"})
    public void shouldConvertToCloudEventsInJsonWithDataAsJsonAndAllMetadataInHeadersAfterOutboxEventRouterAppliedToLogicalDecodingMessage() throws Exception {
        HeaderFrom.Value value = new HeaderFrom.Value();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("fields", "source,op");
        linkedHashMap.put("headers", "source,op");
        linkedHashMap.put("operation", "copy");
        linkedHashMap.put("header.converter.schemas.enable", "true");
        value.configure(linkedHashMap);
        DecodeLogicalDecodingMessageContent decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent();
        decodeLogicalDecodingMessageContent.configure(new LinkedHashMap());
        EventRouter eventRouter = new EventRouter();
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        linkedHashMap2.put("table.expand.json.payload", "true");
        linkedHashMap2.put("table.field.event.key", "aggregateId");
        linkedHashMap2.put("table.fields.additional.placement", "type:header");
        linkedHashMap2.put("route.by.field", "aggregateType");
        eventRouter.configure(linkedHashMap2);
        TestHelper.execute(createStatementToCallInsertToWalFunction("59a42efd-b015-44a9-9dde-cb36d9002425", "UserCreated", "User", "10711fa5", "{\"someField1\": \"some value 1\",\"someField2\": 7005}"), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(1);
        SourceRecord apply = eventRouter.apply(decodeLogicalDecodingMessageContent.apply(value.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(topicNameMessage()).get(0))));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.User");
        Assertions.assertThat(apply.keySchema()).isEqualTo(Schema.OPTIONAL_STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("10711fa5");
        Assertions.assertThat(apply.value()).isInstanceOf(Struct.class);
        CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithMetadataAndIdAndTypeInHeaders(apply, getConnectorName(), getServerName());
        value.close();
        decodeLogicalDecodingMessageContent.close();
        eventRouter.close();
    }
}
