package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.data.Json;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.async.AbstractAsyncEngineConnectorTest;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.transforms.VectorToJsonConverter;
import java.util.List;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

@SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 15, reason = "PgVector is tested only with PostgreSQL 15+")
/* loaded from: input_file:io/debezium/connector/postgresql/PostgresVectorToJsonConverterIT.class */
public class PostgresVectorToJsonConverterIT extends AbstractAsyncEngineConnectorTest {
    private PostgresConnection connection;

    @Before
    public void before() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1;", new String[0]);
        TestHelper.execute("CREATE EXTENSION IF NOT EXISTS vector;", new String[0]);
        this.connection = TestHelper.create();
        this.connection.setAutoCommit(false);
    }

    @After
    public void after() throws Exception {
        stopConnector();
        assertNoRecordsToConsume();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @Test
    @FixFor({"DBZ-8571"})
    public void shouldConvertFloatVectorToJson() throws Exception {
        TestHelper.execute("CREATE TABLE s1.dbz8571 (id int primary key, data halfvec(3));", new String[0]);
        TestHelper.execute("ALTER TABLE s1.dbz8571 REPLICA IDENTITY FULL;", new String[0]);
        TestHelper.execute("INSERT INTO s1.dbz8571 (id,data) values (1,'[101,102,103]');", new String[0]);
        start(PostgresConnector.class, getConfigurationWithTransform());
        assertConnectorIsRunning();
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        TestHelper.execute("INSERT INTO s1.dbz8571 (id,data) values (2,'[1,2,3]');", new String[0]);
        TestHelper.execute("UPDATE s1.dbz8571 set data = '[5,7,9]' WHERE id = 2;", new String[0]);
        TestHelper.execute("DELETE FROM s1.dbz8571 WHERE id = 2", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(5).recordsForTopic(topicName());
        Assertions.assertThat(recordsForTopic).hasSize(5);
        assertRead((SourceRecord) recordsForTopic.get(0), 1, "{ \"values\": [101.0, 102.0, 103.0] }");
        assertInsert((SourceRecord) recordsForTopic.get(1), 2, "{ \"values\": [1.0, 2.0, 3.0] }");
        assertUpdate((SourceRecord) recordsForTopic.get(2), 2, "{ \"values\": [1.0, 2.0, 3.0] }", "{ \"values\": [5.0, 7.0, 9.0] }");
        assertDelete((SourceRecord) recordsForTopic.get(3), 2, "{ \"values\": [5.0, 7.0, 9.0] }");
        assertTombstone((SourceRecord) recordsForTopic.get(4), 2);
    }

    @Test
    @FixFor({"DBZ-8571"})
    public void shouldConvertDoubleVectorToJson() throws Exception {
        TestHelper.execute("CREATE TABLE s1.dbz8571 (id int primary key, data vector(3));", new String[0]);
        TestHelper.execute("ALTER TABLE s1.dbz8571 REPLICA IDENTITY FULL;", new String[0]);
        TestHelper.execute("INSERT INTO s1.dbz8571 (id,data) values (1,'[101,102,103]');", new String[0]);
        start(PostgresConnector.class, getConfigurationWithTransform());
        assertConnectorIsRunning();
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        TestHelper.execute("INSERT INTO s1.dbz8571 (id,data) values (2,'[1,2,3]');", new String[0]);
        TestHelper.execute("UPDATE s1.dbz8571 set data = '[5,7,9]' WHERE id = 2;", new String[0]);
        TestHelper.execute("DELETE FROM s1.dbz8571 WHERE id = 2", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(5).recordsForTopic(topicName());
        Assertions.assertThat(recordsForTopic).hasSize(5);
        assertRead((SourceRecord) recordsForTopic.get(0), 1, "{ \"values\": [101.0, 102.0, 103.0] }");
        assertInsert((SourceRecord) recordsForTopic.get(1), 2, "{ \"values\": [1.0, 2.0, 3.0] }");
        assertUpdate((SourceRecord) recordsForTopic.get(2), 2, "{ \"values\": [1.0, 2.0, 3.0] }", "{ \"values\": [5.0, 7.0, 9.0] }");
        assertDelete((SourceRecord) recordsForTopic.get(3), 2, "{ \"values\": [5.0, 7.0, 9.0] }");
        assertTombstone((SourceRecord) recordsForTopic.get(4), 2);
    }

    @Test
    @FixFor({"DBZ-8571"})
    public void shouldConvertSparseVectorToJson() throws Exception {
        TestHelper.execute("CREATE TABLE s1.dbz8571 (id int primary key, data sparsevec(25));", new String[0]);
        TestHelper.execute("ALTER TABLE s1.dbz8571 REPLICA IDENTITY FULL;", new String[0]);
        TestHelper.execute("INSERT INTO s1.dbz8571 (id,data) values (1,'{1: 25, 2: 15, 10: 100}/25');", new String[0]);
        start(PostgresConnector.class, getConfigurationWithTransform());
        assertConnectorIsRunning();
        waitForStreamingRunning("postgres", TestHelper.TEST_SERVER);
        TestHelper.execute("INSERT INTO s1.dbz8571 (id,data) values (2,'{2: 10, 5: 20, 20: 30}/25');", new String[0]);
        TestHelper.execute("UPDATE s1.dbz8571 set data = '{1:5,2:10,3:25}/25' WHERE id = 2;", new String[0]);
        TestHelper.execute("DELETE FROM s1.dbz8571 WHERE id = 2", new String[0]);
        List recordsForTopic = consumeRecordsByTopic(5).recordsForTopic(topicName());
        Assertions.assertThat(recordsForTopic).hasSize(5);
        assertRead((SourceRecord) recordsForTopic.get(0), 1, "{ \"dimensions\": 25, \"vector\": { \"1\": 25.0, \"2\": 15.0, \"10\": 100.0 } }");
        assertInsert((SourceRecord) recordsForTopic.get(1), 2, "{ \"dimensions\": 25, \"vector\": { \"2\": 10.0, \"5\": 20.0, \"20\": 30.0 } }");
        assertUpdate((SourceRecord) recordsForTopic.get(2), 2, "{ \"dimensions\": 25, \"vector\": { \"2\": 10.0, \"5\": 20.0, \"20\": 30.0 } }", "{ \"dimensions\": 25, \"vector\": { \"1\": 5.0, \"2\": 10.0, \"3\": 25.0 } }");
        assertDelete((SourceRecord) recordsForTopic.get(3), 2, "{ \"dimensions\": 25, \"vector\": { \"1\": 5.0, \"2\": 10.0, \"3\": 25.0 } }");
        assertTombstone((SourceRecord) recordsForTopic.get(4), 2);
    }

    protected Configuration getConfigurationWithTransform() {
        return TestHelper.defaultConfig().with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz8571").with("transforms", "vectortojson").with("transforms.vectortojson.type", VectorToJsonConverter.class.getName()).build();
    }

    protected void assertRead(SourceRecord sourceRecord, int i, String str) {
        VerifyRecord.isValidRead(sourceRecord, "id", i);
        assertFieldIsJsonSchema(getAfter(sourceRecord), "data");
        assertFieldIsJson(getAfter(sourceRecord), "data", str);
    }

    protected void assertInsert(SourceRecord sourceRecord, int i, String str) {
        VerifyRecord.isValidInsert(sourceRecord, "id", i);
        assertFieldIsJsonSchema(getAfter(sourceRecord), "data");
        assertFieldIsJson(getAfter(sourceRecord), "data", str);
    }

    protected void assertUpdate(SourceRecord sourceRecord, int i, String str, String str2) {
        VerifyRecord.isValidUpdate(sourceRecord, "id", i);
        assertFieldIsJsonSchema(getBefore(sourceRecord), "data");
        assertFieldIsJson(getBefore(sourceRecord), "data", str);
        assertFieldIsJsonSchema(getAfter(sourceRecord), "data");
        assertFieldIsJson(getAfter(sourceRecord), "data", str2);
    }

    protected void assertDelete(SourceRecord sourceRecord, int i, String str) {
        VerifyRecord.isValidDelete(sourceRecord, "id", i);
        assertFieldIsJsonSchema(getBefore(sourceRecord), "data");
        assertFieldIsJson(getBefore(sourceRecord), "data", str);
    }

    protected void assertTombstone(SourceRecord sourceRecord, int i) {
        VerifyRecord.isValidTombstone(sourceRecord, "id", i);
    }

    protected Struct getBefore(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("before");
    }

    protected Struct getAfter(SourceRecord sourceRecord) {
        return ((Struct) sourceRecord.value()).getStruct("after");
    }

    protected void assertFieldIsJsonSchema(Struct struct, String str) {
        Assertions.assertThat(struct.schema().field(str).schema()).isEqualTo(struct.schema().isOptional() ? Json.builder().optional().build() : Json.schema());
    }

    protected void assertFieldIsJson(Struct struct, String str, String str2) {
        Assertions.assertThat(struct.get(str)).isEqualTo(str2);
    }

    protected String topicName() {
        return "test_server.s1.dbz8571";
    }
}
