package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.data.Json;
import io.debezium.data.Uuid;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.transforms.outbox.EventRouter;
import java.util.Collections;
import java.util.HashMap;
import java.util.UUID;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.util.Requirements;
import org.codehaus.jackson.map.ObjectMapper;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/OutboxEventRouterIT.class */
public class OutboxEventRouterIT extends AbstractConnectorTest {
    private static final String SETUP_OUTBOX_SCHEMA = "DROP SCHEMA IF EXISTS outboxsmtit CASCADE;CREATE SCHEMA outboxsmtit;";
    private static final String SETUP_OUTOBOX_TABLE = "CREATE TABLE outboxsmtit.outbox (  id            uuid         not null    constraint outbox_pk primary key,  aggregatetype varchar(255) not null,  aggregateid   varchar(255) not null,  type          varchar(255) not null,  payload       jsonb);";
    private EventRouter<SourceRecord> outboxEventRouter;

    private static String createEventInsert(UUID uuid, String str, String str2, String str3, String str4, String str5) {
        StringBuilder sb = new StringBuilder();
        sb.append("INSERT INTO outboxsmtit.outbox VALUES (");
        sb.append("'").append(uuid).append("'");
        sb.append(", '").append(str2).append("'");
        sb.append(", '").append(str3).append("'");
        sb.append(", '").append(str).append("'");
        if (str4 == null) {
            sb.append(", null::jsonb");
        } else if (str4.isEmpty()) {
            sb.append(", ''");
        } else {
            sb.append(", '").append(str4).append("'::jsonb");
        }
        if (str5 != null) {
            sb.append(str5);
        }
        sb.append(")");
        return sb.toString();
    }

    @Before
    public void beforeEach() throws InterruptedException {
        this.outboxEventRouter = new EventRouter<>();
        this.outboxEventRouter.configure(Collections.emptyMap());
        TestHelper.execute(SETUP_OUTBOX_SCHEMA, new String[0]);
        TestHelper.execute(SETUP_OUTOBOX_TABLE, new String[0]);
    }

    @After
    public void afterEach() {
        stopConnector();
        assertNoRecordsToConsume();
        this.outboxEventRouter.close();
    }

    @Test
    public void shouldConsumeRecordsFromInsert() throws Exception {
        startConnectorWithInitialSnapshotRecord();
        TestHelper.execute(createEventInsert(UUID.fromString("59a42efd-b015-44a9-9dde-cb36d9002425"), "UserCreated", "User", "10711fa5", "{}", ""), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        SourceRecord apply = this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.user");
        Struct requireStruct = Requirements.requireStruct(apply.value(), "test payload");
        Assertions.assertThat(requireStruct.getString("eventType")).isEqualTo("UserCreated");
        Assertions.assertThat(new ObjectMapper().readTree(requireStruct.getString("payload")).get("email")).isEqualTo((Object) null);
    }

    @Test
    public void shouldSendEventTypeAsHeader() throws Exception {
        startConnectorWithInitialSnapshotRecord();
        TestHelper.execute(createEventInsert(UUID.fromString("59a42efd-b015-44a9-9dde-cb36d9002425"), "UserCreated", "User", "10711fa5", "{\"email\": \"gh@mefi.in\"}", ""), new String[0]);
        HashMap hashMap = new HashMap();
        hashMap.put("table.fields.additional.placement", "type:header:eventType");
        this.outboxEventRouter.configure(hashMap);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        SourceRecord apply = this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0));
        Assertions.assertThat(apply).isNotNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.user");
        Object value = apply.value();
        Assertions.assertThat(apply.headers().lastWithName("eventType").value()).isEqualTo("UserCreated");
        Assertions.assertThat(value).isInstanceOf(String.class);
        Assertions.assertThat(new ObjectMapper().readTree((String) value).get("email").getTextValue()).isEqualTo("gh@mefi.in");
    }

    @Test
    public void shouldRespectJsonFormatAsString() throws Exception {
        startConnectorWithInitialSnapshotRecord();
        TestHelper.execute(createEventInsert(UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"), "UserCreated", "User", "7bdf2e9e", "{\"email\": \"gh@mefi.in\"}", ""), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        Struct requireStruct = Requirements.requireStruct(this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0)).value(), "test payload");
        Assertions.assertThat(requireStruct.getString("eventType")).isEqualTo("UserCreated");
        Assertions.assertThat(new ObjectMapper().readTree(requireStruct.getString("payload")).get("email").getTextValue()).isEqualTo("gh@mefi.in");
    }

    @Test
    public void shouldSupportAllFeatures() throws Exception {
        startConnectorWithNoSnapshot();
        this.outboxEventRouter = new EventRouter<>();
        HashMap hashMap = new HashMap();
        hashMap.put("table.field.event.schema.version", "version");
        hashMap.put("table.field.event.timestamp", "createdat");
        hashMap.put("table.fields.additional.placement", "version:envelope:eventVersion,aggregatetype:envelope:aggregateType,somebooltype:envelope:someBoolType,somebooltype:header,is_deleted:envelope:deleted");
        this.outboxEventRouter.configure(hashMap);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add version int not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add somebooltype boolean not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add is_deleted boolean default false;", new String[0]);
        TestHelper.execute(createEventInsert(UUID.fromString("f9171eb6-19f3-4579-9206-0e179d2ebad7"), "UserUpdated", "UserEmail", "7bdf2e9e", "{\"email\": \"gh@mefi.in\"}", ", 1, true, TIMESTAMP '2019-03-24 20:52:59'"), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        SourceRecord apply = this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0));
        VerifyRecord.assertConnectSchemasAreEqual((String) null, apply.valueSchema(), SchemaBuilder.struct().version(1).field("payload", Json.builder().optional().build()).field("eventType", Schema.STRING_SCHEMA).field("eventVersion", Schema.INT32_SCHEMA).field("aggregateType", Schema.STRING_SCHEMA).field("someBoolType", Schema.BOOLEAN_SCHEMA).field("deleted", Schema.OPTIONAL_BOOLEAN_SCHEMA).build());
        Assertions.assertThat(apply.timestamp()).isEqualTo(1553460779000000L);
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.useremail");
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(2);
        Header lastWithName = headers.lastWithName("id");
        Assertions.assertThat(lastWithName.schema()).isEqualTo(Uuid.builder().build());
        Assertions.assertThat(lastWithName.value()).isEqualTo("f9171eb6-19f3-4579-9206-0e179d2ebad7");
        Header lastWithName2 = headers.lastWithName("somebooltype");
        Assertions.assertThat(lastWithName2.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
        Assertions.assertThat(lastWithName2.value()).isEqualTo(true);
        Assertions.assertThat(apply.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("7bdf2e9e");
        Struct requireStruct = Requirements.requireStruct(apply.value(), "test envelope");
        Assertions.assertThat(requireStruct.getString("eventType")).isEqualTo("UserUpdated");
        Assertions.assertThat(requireStruct.getString("aggregateType")).isEqualTo("UserEmail");
        Assertions.assertThat(requireStruct.getInt32("eventVersion")).isEqualTo(1);
        Assertions.assertThat(requireStruct.getBoolean("someBoolType")).isEqualTo(true);
        Assertions.assertThat(requireStruct.getBoolean("deleted")).isEqualTo(false);
    }

    @Test
    @FixFor({"DBZ-1320"})
    public void shouldNotProduceTombstoneEventForNullPayload() throws Exception {
        startConnectorWithNoSnapshot();
        this.outboxEventRouter = new EventRouter<>();
        HashMap hashMap = new HashMap();
        hashMap.put("table.field.event.schema.version", "version");
        hashMap.put("table.field.event.timestamp", "createdat");
        hashMap.put("table.fields.additional.placement", "version:envelope:eventVersion,aggregatetype:envelope:aggregateType,somebooltype:envelope:someBoolType,somebooltype:header,is_deleted:envelope:deleted");
        this.outboxEventRouter.configure(hashMap);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add version int not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add somebooltype boolean not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add is_deleted boolean not null default false;", new String[0]);
        TestHelper.execute(createEventInsert(UUID.fromString("a9d76f78-bda6-48d3-97ed-13a146163218"), "UserUpdated", "UserEmail", "a9d76f78", null, ", 1, true, TIMESTAMP '2019-03-24 20:52:59', true"), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        SourceRecord apply = this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0));
        Assertions.assertThat(apply.valueSchema()).isNotNull();
        Assertions.assertThat(apply.timestamp()).isEqualTo(1553460779000000L);
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.useremail");
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(2);
        Header lastWithName = headers.lastWithName("id");
        Assertions.assertThat(lastWithName.schema()).isEqualTo(Uuid.schema());
        Assertions.assertThat(lastWithName.value()).isEqualTo("a9d76f78-bda6-48d3-97ed-13a146163218");
        Header lastWithName2 = headers.lastWithName("somebooltype");
        Assertions.assertThat(lastWithName2.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
        Assertions.assertThat(lastWithName2.value()).isEqualTo(true);
        Assertions.assertThat(apply.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("a9d76f78");
        Assertions.assertThat(apply.value()).isNotNull();
        Assertions.assertThat(((Struct) apply.value()).get("eventType")).isEqualTo("UserUpdated");
        Assertions.assertThat(((Struct) apply.value()).get("payload")).isNull();
    }

    @Test
    @FixFor({"DBZ-1320"})
    public void shouldProduceTombstoneEventForNullPayload() throws Exception {
        startConnectorWithNoSnapshot();
        this.outboxEventRouter = new EventRouter<>();
        HashMap hashMap = new HashMap();
        hashMap.put("table.field.event.schema.version", "version");
        hashMap.put("table.field.event.timestamp", "createdat");
        hashMap.put("route.tombstone.on.empty.payload", "true");
        hashMap.put("table.fields.additional.placement", "version:envelope:eventVersion,aggregatetype:envelope:aggregateType,somebooltype:envelope:someBoolType,somebooltype:header,is_deleted:envelope:deleted");
        this.outboxEventRouter.configure(hashMap);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add version int not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add somebooltype boolean not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add createdat timestamp without time zone not null;", new String[0]);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox add is_deleted boolean not null default false;", new String[0]);
        TestHelper.execute(createEventInsert(UUID.fromString("a9d76f78-bda6-48d3-97ed-13a146163218"), "UserUpdated", "UserEmail", "a9d76f78", null, ", 1, true, TIMESTAMP '2019-03-24 20:52:59', true"), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        SourceRecord apply = this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0));
        Assertions.assertThat(apply.valueSchema()).isNull();
        Assertions.assertThat(apply.timestamp()).isEqualTo(1553460779000000L);
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.useremail");
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(2);
        Header lastWithName = headers.lastWithName("id");
        Assertions.assertThat(lastWithName.schema()).isEqualTo(Uuid.schema());
        Assertions.assertThat(lastWithName.value()).isEqualTo("a9d76f78-bda6-48d3-97ed-13a146163218");
        Header lastWithName2 = headers.lastWithName("somebooltype");
        Assertions.assertThat(lastWithName2.schema()).isEqualTo(SchemaBuilder.BOOLEAN_SCHEMA);
        Assertions.assertThat(lastWithName2.value()).isEqualTo(true);
        Assertions.assertThat(apply.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("a9d76f78");
        Assertions.assertThat(apply.value()).isNull();
    }

    @Test
    @FixFor({"DBZ-1320"})
    public void shouldProduceTombstoneEventForEmptyPayload() throws Exception {
        startConnectorWithNoSnapshot();
        this.outboxEventRouter = new EventRouter<>();
        HashMap hashMap = new HashMap();
        hashMap.put("route.tombstone.on.empty.payload", "true");
        this.outboxEventRouter.configure(hashMap);
        TestHelper.execute("ALTER TABLE outboxsmtit.outbox ALTER COLUMN payload SET DATA TYPE VARCHAR(1000);", new String[0]);
        TestHelper.execute(createEventInsert(UUID.fromString("a9d76f78-bda6-48d3-97ed-13a146163218"), "UserUpdated", "UserEmail", "a9d76f78", "", null), new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
        SourceRecord apply = this.outboxEventRouter.apply((SourceRecord) consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).get(0));
        Assertions.assertThat(apply.valueSchema()).isNull();
        Assertions.assertThat(apply.topic()).isEqualTo("outbox.event.useremail");
        Headers headers = apply.headers();
        Assertions.assertThat(headers.size()).isEqualTo(1);
        Header lastWithName = headers.lastWithName("id");
        Assertions.assertThat(lastWithName.schema()).isEqualTo(Uuid.schema());
        Assertions.assertThat(lastWithName.value()).isEqualTo("a9d76f78-bda6-48d3-97ed-13a146163218");
        Assertions.assertThat(apply.keySchema()).isEqualTo(SchemaBuilder.STRING_SCHEMA);
        Assertions.assertThat(apply.key()).isEqualTo("a9d76f78");
        Assertions.assertThat(apply.value()).isNull();
    }

    private void startConnectorWithInitialSnapshotRecord() throws Exception {
        TestHelper.execute(createEventInsert(UUID.fromString("70f52ae3-f671-4bac-ae62-1b9be6e73700"), "UserCreated", "User", "10711faf", "{}", ""), new String[0]);
        start(PostgresConnector.class, getConfigurationBuilder(PostgresConnectorConfig.SnapshotMode.INITIAL).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("outboxsmtit.outbox")).size()).isEqualTo(1);
    }

    private void startConnectorWithNoSnapshot() throws InterruptedException {
        start(PostgresConnector.class, getConfigurationBuilder(PostgresConnectorConfig.SnapshotMode.NEVER).build());
        waitForStreamingRunning("postgres", "test_server");
    }

    private static Configuration.Builder getConfigurationBuilder(PostgresConnectorConfig.SnapshotMode snapshotMode) {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, snapshotMode.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SCHEMA_WHITELIST, "outboxsmtit").with(PostgresConnectorConfig.TABLE_WHITELIST, "outboxsmtit\\.outbox");
    }
}
