package io.debezium.connector.postgresql;

import io.debezium.connector.postgresql.AbstractRecordsProducerTest;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.junit.ConditionalFail;
import io.debezium.relational.TableId;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import junit.framework.TestCase;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/connector/postgresql/PublicGeometryIT.class */
public class PublicGeometryIT extends AbstractRecordsProducerTest {
    private RecordsStreamProducer recordsProducer;
    private AbstractRecordsProducerTest.TestConsumer consumer;
    private final Consumer<Throwable> blackHole = th -> {
    };

    @Rule
    public final TestRule skip = new SkipTestDependingOnDecoderPluginNameRule();

    @Rule
    public TestRule conditionalFail = new ConditionalFail();

    @Before
    public void before() throws Exception {
        PostgresConnection create = TestHelper.create();
        Throwable th = null;
        try {
            create.dropReplicationSlot("debezium");
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    create.close();
                }
            }
            TestHelper.dropAllSchemas();
            TestHelper.execute("DROP SCHEMA IF EXISTS postgis CASCADE;", "CREATE EXTENSION IF NOT EXISTS postgis SCHEMA public;", "CREATE TABLE public.postgis_table (pk SERIAL, p GEOMETRY(POINT,3187), ml GEOGRAPHY(MULTILINESTRING), PRIMARY KEY(pk));", "CREATE TABLE public.postgis_array_table (pk SERIAL, ga GEOMETRY[], gann GEOMETRY[] NOT NULL, PRIMARY KEY(pk));", "CREATE TABLE public.dummy_table (pk SERIAL, PRIMARY KEY(pk));");
            setupRecordsProducer(new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false).build()));
        } catch (Throwable th3) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @After
    public void after() throws Exception {
        if (this.recordsProducer != null) {
            this.recordsProducer.stop();
        }
    }

    @Test(timeout = 30000)
    @FixFor({"DBZ-1144"})
    public void shouldReceiveChangesForInsertsWithPostgisTypes() throws Exception {
        this.consumer = testConsumer(1, "public");
        this.consumer.setIgnoreExtraRecords(true);
        this.recordsProducer.start(this.consumer, this.blackHole);
        TestHelper.execute("INSERT INTO public.dummy_table DEFAULT VALUES;", new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS);
        while (true) {
            if (!this.consumer.isEmpty() && this.consumer.remove().topic().endsWith(".public.dummy_table")) {
                this.consumer.expects(1);
                assertInsert("INSERT INTO public.postgis_table (p, ml) VALUES ('SRID=3187;POINT(174.9479 -36.7208)'::geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::geography)", 1, schemaAndValuesForPostgisTypes());
                this.consumer.expects(1);
                assertInsert("INSERT INTO public.postgis_array_table (ga, gann) VALUES (ARRAY['GEOMETRYCOLLECTION EMPTY'::geometry, 'POLYGON((166.51 -46.64, 178.52 -46.64, 178.52 -34.45, 166.51 -34.45, 166.51 -46.64))'::geometry], ARRAY['GEOMETRYCOLLECTION EMPTY'::geometry, 'POLYGON((166.51 -46.64, 178.52 -46.64, 178.52 -34.45, 166.51 -34.45, 166.51 -46.64))'::geometry])", 1, schemaAndValuesForPostgisArrayTypes());
                return;
            }
        }
    }

    private void setupRecordsProducer(PostgresConnectorConfig postgresConnectorConfig) {
        if (this.recordsProducer != null) {
            this.recordsProducer.stop();
        }
        this.recordsProducer = new RecordsStreamProducer(new PostgresTaskContext(postgresConnectorConfig, TestHelper.getSchema(postgresConnectorConfig), PostgresTopicSelector.create(postgresConnectorConfig)), new SourceInfo(postgresConnectorConfig.getLogicalName(), postgresConnectorConfig.databaseName()));
    }

    private void assertInsert(String str, Integer num, List<AbstractRecordsProducerTest.SchemaAndValueField> list) {
        TableId tableIdFromInsertStmt = tableIdFromInsertStmt(str);
        String replaceAll = (tableIdFromInsertStmt.schema() + "." + tableIdFromInsertStmt.table()).replaceAll("[ \"]", "_");
        try {
            executeAndWait(str);
            SourceRecord assertRecordInserted = assertRecordInserted(replaceAll, num != null ? "pk" : null, num);
            assertRecordOffsetAndSnapshotSource(assertRecordInserted, false, false);
            assertSourceInfo(assertRecordInserted, "postgres", tableIdFromInsertStmt.schema(), tableIdFromInsertStmt.table());
            assertRecordSchemaAndValues(list, assertRecordInserted, "after");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private SourceRecord assertRecordInserted(String str, String str2, Integer num) throws InterruptedException {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName(str), remove.topic());
        if (num != null) {
            VerifyRecord.isValidInsert(remove, str2, num.intValue());
        } else {
            VerifyRecord.isValidInsert(remove);
        }
        return remove;
    }

    private void executeAndWait(String str) throws Exception {
        TestHelper.execute(str, new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
    }
}
