package io.debezium.connector.postgresql;

import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.AbstractRecordsProducerTest;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDatabaseVersionRule;
import io.debezium.connector.postgresql.junit.SkipWhenDatabaseVersionLessThan;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/connector/postgresql/RecordsSnapshotProducerIT.class */
public class RecordsSnapshotProducerIT extends AbstractRecordsProducerTest {

    @Rule
    public final TestRule skip = new SkipTestDependingOnDatabaseVersionRule();

    @Before
    public void before() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("init_postgis.ddl");
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.executeDDL("postgis_create_tables.ddl");
    }

    @Test
    public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
        TestHelper.execute(((String) ALL_STMTS.stream().collect(Collectors.joining(";" + System.lineSeparator()))) + ";", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(ALL_STMTS.size(), "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> schemaAndValuesByTopicName = super.schemaAndValuesByTopicName();
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, schemaAndValuesByTopicName);
        });
        Testing.Print.enable();
        while (!testConsumer.isEmpty()) {
            SourceRecord remove = testConsumer.remove();
            assertRecordOffsetAndSnapshotSource(remove, true, testConsumer.isEmpty());
            assertSourceInfo(remove);
        }
    }

    @Test
    public void shouldGenerateSnapshotsForCustomDatatypes() throws Exception {
        TestHelper.execute("INSERT INTO custom_table (lt, i, n, lt_array) VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL, '{\"Ship.Frigate\",\"Ship.Destroyer\"}')", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.custom_table", schemasAndValuesForCustomTypes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        buildWithStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(2, "s1", "s2");
        testConsumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        testConsumer.clear();
        waitForStreamingToStart();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        testConsumer.expects(2);
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        SourceRecord remove = testConsumer.remove();
        VerifyRecord.isValidInsert(remove, "pk", 2);
        Assert.assertEquals(TestHelper.topicName("s1.a"), remove.topic());
        assertRecordOffsetAndSnapshotSource(remove, false, false);
        assertSourceInfo(remove, "postgres", "s1", "a");
        SourceRecord remove2 = testConsumer.remove();
        VerifyRecord.isValidInsert(remove2, "pk", 2);
        Assert.assertEquals(TestHelper.topicName("s2.a"), remove2.topic());
        assertRecordOffsetAndSnapshotSource(remove2, false, false);
        assertSourceInfo(remove2, "postgres", "s2", "a");
        stopConnector();
        assertConnectorNotRunning();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        int i = 6;
        buildWithStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer2 = testConsumer(6, "s1", "s2");
        testConsumer2.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        testConsumer2.process(sourceRecord -> {
            int andIncrement = atomicInteger.getAndIncrement();
            VerifyRecord.isValidRead(sourceRecord, "pk", (andIncrement % 3) + 1);
            assertRecordOffsetAndSnapshotSource(sourceRecord, true, andIncrement == i - 1);
            assertSourceInfo(sourceRecord);
        });
        testConsumer2.clear();
        waitForStreamingToStart();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        testConsumer2.expects(2);
        testConsumer2.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        SourceRecord remove3 = testConsumer2.remove();
        VerifyRecord.isValidInsert(remove3, "pk", 4);
        assertRecordOffsetAndSnapshotSource(remove3, false, false);
        assertSourceInfo(remove3, "postgres", "s1", "a");
        SourceRecord remove4 = testConsumer2.remove();
        VerifyRecord.isValidInsert(remove4, "pk", 4);
        assertRecordOffsetAndSnapshotSource(remove4, false, false);
        assertSourceInfo(remove4, "postgres", "s2", "a");
    }

    @Test
    @FixFor({"DBZ-859"})
    public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE TABLE t1 (pk SERIAL, aa integer, PRIMARY KEY(pk)); INSERT INTO t1 VALUES (default, 11)", new String[0]);
        buildWithStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL).with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(Heartbeat.HEARTBEAT_INTERVAL, 300000));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(2, new String[0]);
        testConsumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        SourceRecord remove = testConsumer.remove();
        VerifyRecord.isValidRead(remove, "pk", 1);
        assertRecordOffsetAndSnapshotSource(remove, true, true);
        SourceRecord remove2 = testConsumer.remove();
        Assertions.assertThat(remove2.topic()).startsWith("__debezium-heartbeat");
        assertRecordOffsetAndSnapshotSource(remove2, false, false);
    }

    private void assertReadRecord(SourceRecord sourceRecord, Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> map) {
        VerifyRecord.isValidRead(sourceRecord, "pk", 1);
        String replace = sourceRecord.topic().replace("test_server.", "");
        List<AbstractRecordsProducerTest.SchemaAndValueField> list = map.get(replace);
        Assert.assertNotNull("No expected values for " + replace + " found", list);
        assertRecordSchemaAndValues(list, sourceRecord, "after");
    }

    @Test
    @FixFor({"DBZ-342"})
    public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() throws Exception {
        TestHelper.execute(((String) ALL_STMTS.stream().collect(Collectors.joining(";" + System.lineSeparator()))) + ";", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(ALL_STMTS.size(), "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> schemaAndValuesByTopicNameAdaptiveTimeMicroseconds = super.schemaAndValuesByTopicNameAdaptiveTimeMicroseconds();
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, schemaAndValuesByTopicNameAdaptiveTimeMicroseconds);
        });
        while (!testConsumer.isEmpty()) {
            SourceRecord remove = testConsumer.remove();
            assertRecordOffsetAndSnapshotSource(remove, true, testConsumer.isEmpty());
            assertSourceInfo(remove);
        }
    }

    @Test
    @FixFor({"DBZ-606"})
    public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO numeric_decimal_table (d, dzs, dvs, d_nn, n, nzs, nvs, d_int, dzs_int, dvs_int, n_int, nzs_int, nvs_int, d_nan, dzs_nan, dvs_nan, n_nan, nzs_nan, nvs_nan) VALUES (1.1, 10.11, 10.1111, 3.30, 22.22, 22.2, 22.2222, 1, 10, 10, 22, 22, 22, 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN')", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.STRING));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> schemaAndValuesByTopicNameStringEncodedDecimals = super.schemaAndValuesByTopicNameStringEncodedDecimals();
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, schemaAndValuesByTopicNameStringEncodedDecimals);
        });
        while (!testConsumer.isEmpty()) {
            SourceRecord remove = testConsumer.remove();
            assertRecordOffsetAndSnapshotSource(remove, true, testConsumer.isEmpty());
            assertSourceInfo(remove);
        }
    }

    @Test
    @FixFor({"DBZ-1118"})
    @SkipWhenDatabaseVersionLessThan(SkipWhenDatabaseVersionLessThan.PostgresVersion.POSTGRES_10)
    public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE TABLE first_table (pk integer, user_id integer, PRIMARY KEY(pk));CREATE TABLE partitioned (pk serial, user_id integer, aa integer) PARTITION BY RANGE (user_id);CREATE TABLE partitioned_1_100 PARTITION OF partitioned (CONSTRAINT p_1_100_pk PRIMARY KEY (pk)) FOR VALUES FROM (1) TO (101);CREATE TABLE partitioned_101_200 PARTITION OF partitioned (CONSTRAINT p_101_200_pk PRIMARY KEY (pk)) FOR VALUES FROM (101) TO (201);", new String[0]);
        TestHelper.execute("INSERT INTO first_table (pk, user_id) VALUES (1000, 1);", new String[0]);
        TestHelper.execute("INSERT INTO partitioned (user_id, aa) SELECT RANDOM() * 99 + 1, RANDOM() * 100000 FROM generate_series(1, 10);", new String[0]);
        TestHelper.execute("INSERT INTO partitioned (user_id, aa) SELECT RANDOM() * 99 + 101, RANDOM() * 100000 FROM generate_series(1, 20);", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(61, new String[0]);
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        HashSet hashSet = new HashSet();
        Map hashMapOf = Collect.hashMapOf("test_server.public.first_table", 0, "test_server.public.partitioned", 0, "test_server.public.partitioned_1_100", 0, "test_server.public.partitioned_101_200", 0);
        testConsumer.process(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.key();
            if (struct != null) {
                Integer int32 = struct.getInt32("pk");
                Assertions.assertThat(hashSet).excludes(new Object[]{int32});
                hashSet.add(int32);
            }
            hashMapOf.put(sourceRecord.topic(), Integer.valueOf(((Integer) hashMapOf.get(sourceRecord.topic())).intValue() + 1));
        });
        Assert.assertEquals(31L, hashSet.size());
        Assert.assertEquals(1L, ((Integer) hashMapOf.get("test_server.public.first_table")).intValue());
        Assert.assertEquals(30L, ((Integer) hashMapOf.get("test_server.public.partitioned")).intValue());
        Assert.assertEquals(10L, ((Integer) hashMapOf.get("test_server.public.partitioned_1_100")).intValue());
        Assert.assertEquals(20L, ((Integer) hashMapOf.get("test_server.public.partitioned_101_200")).intValue());
        while (!testConsumer.isEmpty()) {
            SourceRecord remove = testConsumer.remove();
            assertRecordOffsetAndSnapshotSource(remove, true, testConsumer.isEmpty());
            assertSourceInfo(remove);
        }
    }

    @Test
    @FixFor({"DBZ-1162"})
    public void shouldGenerateSnapshotsForHstores() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO hstore_table (hs) VALUES ('\"key\" => \"val\"'::hstore)", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.hstore_table", schemaAndValueFieldForJsonEncodedHStoreType());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1163"})
    public void shouldGenerateSnapshotForATableWithoutPrimaryKey() throws Exception {
        TestHelper.execute("insert into table_without_pk values(1, 1000)", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        List asList = Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("val", Schema.OPTIONAL_INT32_SCHEMA, 1000));
        testConsumer.process(sourceRecord -> {
            Assertions.assertThat(sourceRecord.key()).isNull();
            Assert.assertEquals("public.table_without_pk", sourceRecord.topic().replace("test_server.", ""));
            assertRecordSchemaAndValues(asList, sourceRecord, "after");
        });
    }

    @Test
    @FixFor({"DBZ-1193"})
    @SkipWhenDatabaseVersionLessThan(SkipWhenDatabaseVersionLessThan.PostgresVersion.POSTGRES_10)
    public void shouldGenerateSnapshotForMacaddr8Datatype() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE TABLE macaddr8_table(pk SERIAL, m MACADDR8, PRIMARY KEY(pk));", new String[0]);
        TestHelper.execute("INSERT INTO macaddr8_table (m) VALUES ('08:00:2b:01:02:03:04:05');", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.macaddr8_table", schemaAndValueForMacaddr8Type());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1164"})
    public void shouldGenerateSnapshotForTwentyFourHourTime() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttf, ttz, tptz, it) VALUES ('2016-11-04T13:51:30.123456'::TIMESTAMP, '1936-10-25T22:10:12.608'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456+02:00'::TIMESTAMPTZ, '2016-11-04'::DATE, '13:51:30'::TIME, '13:51:30.123'::TIME, '24:00:00'::TIME, '13:51:30.123789+02:00'::TIMETZ, '13:51:30.123+02:00'::TIMETZ, 'P1Y2M3DT4H5M0S'::INTERVAL)", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.time_table", schemaAndValuesForDateTimeTypes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1345"})
    public void shouldNotSnapshotMaterializedViews() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE TABLE mv_real_table (pk SERIAL, i integer, s VARCHAR(50), PRIMARY KEY(pk));", new String[0]);
        TestHelper.execute("CREATE MATERIALIZED VIEW mv (pk, s) AS SELECT mrv.pk, mrv.s FROM mv_real_table mrv WITH DATA;", new String[0]);
        TestHelper.execute("INSERT INTO mv_real_table (i,s) VALUES (1,'1');", new String[0]);
        TestHelper.execute("REFRESH MATERIALIZED VIEW mv WITH DATA;", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.mv_real_table", schemaAndValueForMaterializedViewBaseType());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    private void buildNoStreamProducer(Configuration.Builder builder) {
        start(PostgresConnector.class, builder.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
    }

    private void buildWithStreamProducer(Configuration.Builder builder) {
        start(PostgresConnector.class, builder.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.ALWAYS).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
    }
}
