package io.debezium.connector.postgresql.transforms;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.PostgresConnector;
import io.debezium.connector.postgresql.TestHelper;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import junit.framework.TestCase;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/connector/postgresql/transforms/DecodeLogicalDecodingMessageContentTest.class */
public class DecodeLogicalDecodingMessageContentTest extends AbstractConnectorTest {

    @Rule
    public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule();
    private DecodeLogicalDecodingMessageContent<SourceRecord> decodeLogicalDecodingMessageContent;

    @Before
    public void beforeEach() throws Exception {
        start(PostgresConnector.class, TestHelper.defaultConfig().build());
        assertConnectorIsRunning();
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        this.decodeLogicalDecodingMessageContent = new DecodeLogicalDecodingMessageContent<>();
        this.decodeLogicalDecodingMessageContent.configure(Collections.emptyMap());
    }

    @After
    public void afterEach() throws Exception {
        stopConnector();
        assertNoRecordsToConsume();
        this.decodeLogicalDecodingMessageContent.close();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-8103"})
    public void shouldFailWhenLogicalDecodingMessageContentIsEmptyString() throws Exception {
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '');", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("message"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        Assertions.assertThat(((Exception) Assert.assertThrows(DebeziumException.class, () -> {
            this.decodeLogicalDecodingMessageContent.apply((SourceRecord) recordsForTopic.get(0));
        })).getMessage()).isEqualTo("Unable to parse logical decoding message content JSON string ''");
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-8103"})
    public void shouldConvertRecordWithNonTransactionalLogicalDecodingMessageWithEmptyContent() throws Exception {
        TestHelper.execute("SELECT pg_logical_emit_message(false, 'foo', '{}');", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("message"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord apply = this.decodeLogicalDecodingMessageContent.apply((SourceRecord) recordsForTopic.get(0));
        Assertions.assertThat(apply).isNotNull();
        Struct struct = (Struct) apply.value();
        String string = struct.getString("op");
        Struct struct2 = struct.getStruct("source");
        Struct struct3 = struct.getStruct("after");
        Assert.assertNull(struct2.getInt64("txId"));
        Assert.assertNotNull(struct2.getInt64("ts_ms"));
        Assert.assertNotNull(struct2.getInt64("lsn"));
        TestCase.assertEquals("", struct2.getString("table"));
        TestCase.assertEquals("", struct2.getString("schema"));
        TestCase.assertEquals(Envelope.Operation.CREATE.code(), string);
        TestCase.assertEquals(0, struct3.schema().fields().size());
        Assert.assertTrue(struct.schema().fields().stream().noneMatch(field -> {
            return field.name().equals("message");
        }));
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-8103"})
    public void shouldConvertRecordWithTransactionalLogicalDecodingMessageWithContentNotIncludingNullField() throws Exception {
        TestHelper.execute("SELECT pg_logical_emit_message(true, 'foo', '{\"bar\": \"baz\", \"qux\": 9703, \"quux\": null}');", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("message"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord apply = this.decodeLogicalDecodingMessageContent.apply((SourceRecord) recordsForTopic.get(0));
        Assertions.assertThat(apply).isNotNull();
        Struct struct = (Struct) apply.value();
        String string = struct.getString("op");
        Struct struct2 = struct.getStruct("source");
        Struct struct3 = struct.getStruct("after");
        Assert.assertNotNull(struct2.getInt64("txId"));
        Assert.assertNotNull(struct2.getInt64("ts_ms"));
        Assert.assertNotNull(struct2.getInt64("lsn"));
        TestCase.assertEquals("", struct2.getString("table"));
        TestCase.assertEquals("", struct2.getString("schema"));
        TestCase.assertEquals(Envelope.Operation.CREATE.code(), string);
        TestCase.assertEquals(2, struct3.schema().fields().size());
        TestCase.assertEquals("baz", struct3.get("bar"));
        TestCase.assertEquals(9703, struct3.get("qux"));
        Assert.assertTrue(struct.schema().fields().stream().noneMatch(field -> {
            return field.name().equals("message");
        }));
    }

    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 14, minor = 0, reason = "Message not supported for PG version < 14")
    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Only supported on PgOutput")
    @FixFor({"DBZ-8103"})
    public void shouldConvertRecordWithTransactionalLogicalDecodingMessageWithContentIncludingNullField() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("fields.null.include", true);
        this.decodeLogicalDecodingMessageContent.configure(hashMap);
        TestHelper.execute("SELECT pg_logical_emit_message(true, 'foo', '{\"bar\": \"baz\", \"qux\": 9703, \"quux\": null}');", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("message"));
        Assertions.assertThat(recordsForTopic).hasSize(1);
        SourceRecord apply = this.decodeLogicalDecodingMessageContent.apply((SourceRecord) recordsForTopic.get(0));
        Assertions.assertThat(apply).isNotNull();
        Struct struct = (Struct) apply.value();
        String string = struct.getString("op");
        Struct struct2 = struct.getStruct("source");
        Struct struct3 = struct.getStruct("after");
        Assert.assertNotNull(struct2.getInt64("txId"));
        Assert.assertNotNull(struct2.getInt64("ts_ms"));
        Assert.assertNotNull(struct2.getInt64("lsn"));
        TestCase.assertEquals("", struct2.getString("table"));
        TestCase.assertEquals("", struct2.getString("schema"));
        TestCase.assertEquals(Envelope.Operation.CREATE.code(), string);
        TestCase.assertEquals(3, struct3.schema().fields().size());
        TestCase.assertEquals("baz", struct3.get("bar"));
        TestCase.assertEquals(9703, struct3.get("qux"));
        Assert.assertNull(struct3.get("quux"));
        Assert.assertTrue(struct.schema().fields().stream().noneMatch(field -> {
            return field.name().equals("message");
        }));
    }
}
