package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.data.Uuid;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.transforms.outbox.EventRouter;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.codehaus.jackson.map.ObjectMapper;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/OutboxEventRouterIT.class */
public class OutboxEventRouterIT extends AbstractConnectorTest {
    private static final String SETUP_OUTBOX_SCHEMA = "DROP SCHEMA IF EXISTS outboxsmtit CASCADE;CREATE SCHEMA outboxsmtit;";
    private static final String SETUP_OUTOBOX_TABLE = "CREATE TABLE outboxsmtit.outbox (  id            uuid         not null    constraint outbox_pk primary key,  aggregatetype varchar(255) not null,  aggregateid   varchar(255) not null,  type          varchar(255) not null,  payload       jsonb        not null);";
    private EventRouter<SourceRecord> outboxEventRouter;

    private static String createEventInsert(UUID uuid, String str, String str2, String str3, String str4, String str5) {
        return String.format("INSERT INTO outboxsmtit.outbox VALUES ('%s', '%s', '%s', '%s', '%s'::jsonb%s);", uuid, str2, str3, str, str4, str5);
    }

    @Before
    public void beforeEach() {
        this.outboxEventRouter = new EventRouter<>();
        this.outboxEventRouter.configure(Collections.emptyMap());
        TestHelper.execute(SETUP_OUTBOX_SCHEMA, new String[0]);
        TestHelper.execute(SETUP_OUTOBOX_TABLE, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).with(PostgresConnectorConfig.SCHEMA_WHITELIST, "outboxsmtit").with(PostgresConnectorConfig.TABLE_WHITELIST, "outboxsmtit\\.outbox").build());
        assertConnectorIsRunning();
    }

    @After
    public void afterEach() {
        stopConnector();
        assertNoRecordsToConsume();
        this.outboxEventRouter.close();
    }

    @Test
    public void shouldConsumeRecordsFromInsert() throws Exception {
        TestHelper.execute(createEventInsert(UUID.fromString("59a42efd-b015-44a9-9dde-cb36d9002425"), "UserCreated", "User", "10711fa5", "{}", ""), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        SourceRecord apply = this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.user");
    }

    @Test
    public void shouldRespectJsonFormatAsString() throws Exception {
        TestHelper.execute(createEventInsert(UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"), "UserCreated", "User", "7bdf2e9e", "{\"email\": \"gh@mefi.in\"}", ""), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        Assertions.assertThat(new ObjectMapper().readTree(Requirements.requireStruct(this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0)).value(), "test payload").getString("payload")).get("email").getTextValue()).isEqualTo("gh@mefi.in");
    }

    @Test
    public void shouldSupportAllFeatures() throws Exception {
        this.outboxEventRouter = new EventRouter<>();
        HashMap hashMap = new HashMap();
        hashMap.put("table.field.event.schema.version", "version");
        hashMap.put("table.field.event.timestamp", "createdat");
        hashMap.put("table.fields.additional.placement", "version:envelope:eventVersion,aggregatetype:envelope:aggregateType,somebooltype:envelope:someBoolType,somebooltype:header");
        this.outboxEventRouter.configure(hashMap);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add version int not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add somebooltype boolean not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;", new String[0]);
        TestHelper.execute(createEventInsert(UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"), "UserUpdated", "UserEmail", "7bdf2e9e", "{\"email\": \"gh@mefi.in\"}", ", 1, true, TIMESTAMP '2019-03-24 20:52:59'"), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        SourceRecord apply = this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0));
        Assertions.assertThat(apply.valueSchema().version()).isEqualTo(1);
        Assertions.assertThat(apply.timestamp()).isEqualTo(1553460779000000L);
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.useremail");
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(2);
        Header lastWithName = headers.lastWithName("id");
        Assertions.assertThat(lastWithName.schema()).isEqualTo(Uuid.schema());
        Assertions.assertThat(lastWithName.value()).isEqualTo("f9171eb6-19f3-4579-9206-0e179d2ebad7");
        Header lastWithName2 = headers.lastWithName("somebooltype");
        Assertions.assertThat(lastWithName2.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
        Assertions.assertThat(lastWithName2.value()).isEqualTo(true);
        Assertions.assertThat(apply.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("7bdf2e9e");
        Struct requireStruct = Requirements.requireStruct(apply.value(), "test envelope");
        Assertions.assertThat(requireStruct.getString("eventType")).isEqualTo("UserUpdated");
        Assertions.assertThat(requireStruct.getString("aggregateType")).isEqualTo("UserEmail");
        Assertions.assertThat(requireStruct.getInt32("eventVersion")).isEqualTo(1);
        Assertions.assertThat(requireStruct.getBoolean("someBoolType")).isEqualTo(true);
    }
}
