package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.AbstractRecordsProducerTest;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.data.Bits;
import io.debezium.data.Enum;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipTestRule;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.spi.converter.ConvertedField;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.util.Collect;
import io.debezium.util.Testing;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.fest.assertions.Assertions;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/RecordsSnapshotProducerIT.class */
public class RecordsSnapshotProducerIT extends AbstractRecordsProducerTest {

    @Rule
    public final SkipTestRule skip = new SkipTestRule();

    /* loaded from: input_file:io/debezium/connector/postgresql/RecordsSnapshotProducerIT$CustomDatatypeConverter.class */
    public static class CustomDatatypeConverter implements CustomConverter<SchemaBuilder, RelationalColumn> {
        private SchemaBuilder isbnSchema;

        public void configure(Properties properties) {
            this.isbnSchema = SchemaBuilder.string().name(properties.getProperty("schema.name"));
        }

        public void converterFor(RelationalColumn relationalColumn, CustomConverter.ConverterRegistration<SchemaBuilder> converterRegistration) {
            if ("isbn".equals(relationalColumn.typeName())) {
                converterRegistration.register(this.isbnSchema, obj -> {
                    return obj.toString();
                });
            }
        }

        public /* bridge */ /* synthetic */ void converterFor(ConvertedField convertedField, CustomConverter.ConverterRegistration converterRegistration) {
            converterFor((RelationalColumn) convertedField, (CustomConverter.ConverterRegistration<SchemaBuilder>) converterRegistration);
        }
    }

    @Before
    public void before() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("init_postgis.ddl");
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.executeDDL("postgis_create_tables.ddl");
    }

    @Test
    public void shouldGenerateSnapshotsForDefaultDatatypes() throws Exception {
        TestHelper.execute(((String) ALL_STMTS.stream().collect(Collectors.joining(";" + System.lineSeparator()))) + ";", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(ALL_STMTS.size(), "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> schemaAndValuesByTopicName = super.schemaAndValuesByTopicName();
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, schemaAndValuesByTopicName);
        });
        Testing.Print.enable();
        while (!testConsumer.isEmpty()) {
            SourceRecord remove = testConsumer.remove();
            assertRecordOffsetAndSnapshotSource(remove, true, testConsumer.isEmpty());
            assertSourceInfo(remove);
        }
    }

    @Override // io.debezium.connector.postgresql.AbstractRecordsProducerTest
    protected List<AbstractRecordsProducerTest.SchemaAndValueField> schemasAndValuesForCustomConverterTypes() {
        return Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("i", SchemaBuilder.string().name("io.debezium.postgresql.type.Isbn").build(), "0-393-04002-X"));
    }

    @Test
    @FixFor({"DBZ-1134"})
    public void shouldUseCustomConverter() throws Exception {
        TestHelper.execute("INSERT INTO custom_table (lt, i, n, lt_array) VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL, '{\"Ship.Frigate\",\"Ship.Destroyer\"}')", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with("converters", "first").with("first.type", CustomDatatypeConverter.class.getName()).with("first.schema.name", "io.debezium.postgresql.type.Isbn"));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.custom_table", schemasAndValuesForCustomConverterTypes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
        waitForSnapshotToBeCompleted();
        TestHelper.execute("CREATE TABLE conv_table (pk serial, i isbn NOT NULL, PRIMARY KEY(pk))", new String[0]);
        TestHelper.execute("INSERT INTO conv_table VALUES (default, '978-0-393-04002-9')", new String[0]);
        Map hashMapOf2 = Collect.hashMapOf("public.conv_table", Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("i", SchemaBuilder.string().name("io.debezium.postgresql.type.Isbn").build(), "0-393-04002-X")));
        testConsumer.clear();
        testConsumer.expects(1);
        testConsumer.process(sourceRecord2 -> {
            assertReadRecord(sourceRecord2, hashMapOf2);
        });
        TestHelper.execute("ALTER TABLE conv_table ALTER COLUMN i TYPE varchar(32)", new String[0]);
        TestHelper.execute("INSERT INTO conv_table VALUES (default, '978-0-393-04002-9')", new String[0]);
        Map hashMapOf3 = Collect.hashMapOf("public.conv_table", Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("i", Schema.STRING_SCHEMA, "0-393-04002-X")));
        testConsumer.clear();
        testConsumer.expects(1);
        testConsumer.process(sourceRecord3 -> {
            assertReadRecord(sourceRecord3, hashMapOf3);
        });
    }

    @Test
    public void shouldGenerateSnapshotsForCustomDatatypes() throws Exception {
        TestHelper.execute("INSERT INTO custom_table (lt, i, n, lt_array) VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL, '{\"Ship.Frigate\",\"Ship.Destroyer\"}')", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.custom_table", schemasAndValuesForCustomTypes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    public void shouldGenerateSnapshotAndContinueStreaming() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" + "INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        buildWithStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(2, "s1", "s2");
        waitForSnapshotToBeCompleted();
        testConsumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        testConsumer.clear();
        waitForStreamingToStart();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        testConsumer.expects(2);
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        SourceRecord remove = testConsumer.remove();
        VerifyRecord.isValidInsert(remove, "pk", 2);
        Assert.assertEquals(TestHelper.topicName("s1.a"), remove.topic());
        assertRecordOffsetAndSnapshotSource(remove, false, false);
        assertSourceInfo(remove, "postgres", "s1", "a");
        SourceRecord remove2 = testConsumer.remove();
        VerifyRecord.isValidInsert(remove2, "pk", 2);
        Assert.assertEquals(TestHelper.topicName("s2.a"), remove2.topic());
        assertRecordOffsetAndSnapshotSource(remove2, false, false);
        assertSourceInfo(remove2, "postgres", "s2", "a");
        stopConnector();
        assertConnectorNotRunning();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        int i = 6;
        buildWithStreamProducer(TestHelper.defaultConfig());
        waitForSnapshotToBeCompleted();
        AbstractRecordsProducerTest.TestConsumer testConsumer2 = testConsumer(6, "s1", "s2");
        testConsumer2.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        testConsumer2.process(sourceRecord -> {
            int andIncrement = atomicInteger.getAndIncrement();
            VerifyRecord.isValidRead(sourceRecord, "pk", (andIncrement % 3) + 1);
            assertRecordOffsetAndSnapshotSource(sourceRecord, true, andIncrement == i - 1);
            assertSourceInfo(sourceRecord);
        });
        testConsumer2.clear();
        waitForStreamingToStart();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        testConsumer2.expects(2);
        testConsumer2.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        SourceRecord remove3 = testConsumer2.remove();
        VerifyRecord.isValidInsert(remove3, "pk", 4);
        assertRecordOffsetAndSnapshotSource(remove3, false, false);
        assertSourceInfo(remove3, "postgres", "s1", "a");
        SourceRecord remove4 = testConsumer2.remove();
        VerifyRecord.isValidInsert(remove4, "pk", 4);
        assertRecordOffsetAndSnapshotSource(remove4, false, false);
        assertSourceInfo(remove4, "postgres", "s2", "a");
    }

    @Test
    @FixFor({"DBZ-1564"})
    public void shouldCloseTransactionsAfterSnapshot() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, PRIMARY KEY(pk));" + "INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        buildWithStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(2, "s1", "s2");
        waitForSnapshotToBeCompleted();
        testConsumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        testConsumer.clear();
        waitForStreamingToStart();
        TestHelper.assertNoOpenTransactions();
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-859"})
    public void shouldGenerateSnapshotAndSendHeartBeat() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE TABLE t1 (pk SERIAL, aa integer, PRIMARY KEY(pk)); INSERT INTO t1 VALUES (default, 11)", new String[0]);
        buildWithStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL).with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, true).with(Heartbeat.HEARTBEAT_INTERVAL, 300000));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(2, new String[0]);
        testConsumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        SourceRecord remove = testConsumer.remove();
        VerifyRecord.isValidRead(remove, "pk", 1);
        assertRecordOffsetAndSnapshotSource(remove, true, true);
        SourceRecord remove2 = testConsumer.remove();
        Assertions.assertThat(remove2.topic()).startsWith("__debezium-heartbeat");
        assertRecordOffsetAndSnapshotSource(remove2, false, false);
    }

    private void assertReadRecord(SourceRecord sourceRecord, Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> map) {
        VerifyRecord.isValidRead(sourceRecord, "pk", 1);
        String replace = sourceRecord.topic().replace("test_server.", "");
        List<AbstractRecordsProducerTest.SchemaAndValueField> list = map.get(replace);
        Assert.assertNotNull("No expected values for " + replace + " found", list);
        assertRecordSchemaAndValues(list, sourceRecord, "after");
    }

    @Test
    @FixFor({"DBZ-342"})
    public void shouldGenerateSnapshotsForDefaultDatatypesAdaptiveMicroseconds() throws Exception {
        TestHelper.execute(((String) ALL_STMTS.stream().collect(Collectors.joining(";" + System.lineSeparator()))) + ";", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(ALL_STMTS.size(), "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> schemaAndValuesByTopicNameAdaptiveTimeMicroseconds = super.schemaAndValuesByTopicNameAdaptiveTimeMicroseconds();
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, schemaAndValuesByTopicNameAdaptiveTimeMicroseconds);
        });
        while (!testConsumer.isEmpty()) {
            SourceRecord remove = testConsumer.remove();
            assertRecordOffsetAndSnapshotSource(remove, true, testConsumer.isEmpty());
            assertSourceInfo(remove);
        }
    }

    @Test
    @FixFor({"DBZ-606"})
    public void shouldGenerateSnapshotsForDecimalDatatypesUsingStringEncoding() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO numeric_decimal_table (d, dzs, dvs, d_nn, n, nzs, nvs, d_int, dzs_int, dvs_int, n_int, nzs_int, nvs_int, d_nan, dzs_nan, dvs_nan, n_nan, nzs_nan, nvs_nan) VALUES (1.1, 10.11, 10.1111, 3.30, 22.22, 22.2, 22.2222, 1, 10, 10, 22, 22, 22, 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN')", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.STRING));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map<String, List<AbstractRecordsProducerTest.SchemaAndValueField>> schemaAndValuesByTopicNameStringEncodedDecimals = super.schemaAndValuesByTopicNameStringEncodedDecimals();
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, schemaAndValuesByTopicNameStringEncodedDecimals);
        });
        while (!testConsumer.isEmpty()) {
            SourceRecord remove = testConsumer.remove();
            assertRecordOffsetAndSnapshotSource(remove, true, testConsumer.isEmpty());
            assertSourceInfo(remove);
        }
    }

    @Test
    @FixFor({"DBZ-1118"})
    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 10, reason = "Database version is less than 10.0")
    public void shouldGenerateSnapshotsForPartitionedTables() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE TABLE first_table (pk integer, user_id integer, PRIMARY KEY(pk));CREATE TABLE partitioned (pk serial, user_id integer, aa integer) PARTITION BY RANGE (user_id);CREATE TABLE partitioned_1_100 PARTITION OF partitioned (CONSTRAINT p_1_100_pk PRIMARY KEY (pk)) FOR VALUES FROM (1) TO (101);CREATE TABLE partitioned_101_200 PARTITION OF partitioned (CONSTRAINT p_101_200_pk PRIMARY KEY (pk)) FOR VALUES FROM (101) TO (201);", new String[0]);
        TestHelper.execute("INSERT INTO first_table (pk, user_id) VALUES (1000, 1);", new String[0]);
        TestHelper.execute("INSERT INTO partitioned (user_id, aa) SELECT RANDOM() * 99 + 1, RANDOM() * 100000 FROM generate_series(1, 10);", new String[0]);
        TestHelper.execute("INSERT INTO partitioned (user_id, aa) SELECT RANDOM() * 99 + 101, RANDOM() * 100000 FROM generate_series(1, 20);", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(31, new String[0]);
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        HashSet hashSet = new HashSet();
        Map hashMapOf = Collect.hashMapOf("test_server.public.first_table", 0, "test_server.public.partitioned", 0, "test_server.public.partitioned_1_100", 0, "test_server.public.partitioned_101_200", 0);
        testConsumer.process(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.key();
            if (struct != null) {
                Integer int32 = struct.getInt32("pk");
                Assertions.assertThat(hashSet).excludes(new Object[]{int32});
                hashSet.add(int32);
            }
            hashMapOf.put(sourceRecord.topic(), Integer.valueOf(((Integer) hashMapOf.get(sourceRecord.topic())).intValue() + 1));
        });
        Assert.assertEquals(31L, hashSet.size());
        Assert.assertEquals(1L, ((Integer) hashMapOf.get("test_server.public.first_table")).intValue());
        Assert.assertEquals(0L, ((Integer) hashMapOf.get("test_server.public.partitioned")).intValue());
        Assert.assertEquals(10L, ((Integer) hashMapOf.get("test_server.public.partitioned_1_100")).intValue());
        Assert.assertEquals(20L, ((Integer) hashMapOf.get("test_server.public.partitioned_101_200")).intValue());
        while (!testConsumer.isEmpty()) {
            SourceRecord remove = testConsumer.remove();
            assertRecordOffsetAndSnapshotSource(remove, true, testConsumer.isEmpty());
            assertSourceInfo(remove);
        }
    }

    @Test
    @FixFor({"DBZ-1162"})
    public void shouldGenerateSnapshotsForHstores() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO hstore_table (hs) VALUES ('\"key\" => \"val\"'::hstore)", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.hstore_table", schemaAndValueFieldForJsonEncodedHStoreType());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1163"})
    public void shouldGenerateSnapshotForATableWithoutPrimaryKey() throws Exception {
        TestHelper.execute("insert into table_without_pk values(1, 1000)", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public", "Quoted__");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        List asList = Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", Schema.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("val", Schema.OPTIONAL_INT32_SCHEMA, 1000));
        testConsumer.process(sourceRecord -> {
            Assertions.assertThat(sourceRecord.key()).isNull();
            Assert.assertEquals("public.table_without_pk", sourceRecord.topic().replace("test_server.", ""));
            assertRecordSchemaAndValues(asList, sourceRecord, "after");
        });
    }

    @Test
    @FixFor({"DBZ-1193"})
    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 10, reason = "MACADDR8 data type is only supported since Postgres 10+")
    public void shouldGenerateSnapshotForMacaddr8Datatype() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE TABLE macaddr8_table(pk SERIAL, m MACADDR8, PRIMARY KEY(pk));", new String[0]);
        TestHelper.execute("INSERT INTO macaddr8_table (m) VALUES ('08:00:2b:01:02:03:04:05');", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.macaddr8_table", schemaAndValueForMacaddr8Type());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1164"})
    public void shouldGenerateSnapshotForTwentyFourHourTime() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttf, ttz, tptz, it, ts_large, ts_large_us, ts_large_ms, tz_large, ts_max, ts_min, tz_max, tz_min, ts_pinf, ts_ninf, tz_pinf, tz_ninf) VALUES ('2016-11-04T13:51:30.123456'::TIMESTAMP, '1936-10-25T22:10:12.608'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456+02:00'::TIMESTAMPTZ, '2016-11-04'::DATE, '13:51:30'::TIME, '13:51:30.123'::TIME, '24:00:00'::TIME, '13:51:30.123789+02:00'::TIMETZ, '13:51:30.123+02:00'::TIMETZ, 'P1Y2M3DT4H5M6.78S'::INTERVAL,'21016-11-04T13:51:30.123456'::TIMESTAMP, '21016-11-04T13:51:30.123457'::TIMESTAMP, '21016-11-04T13:51:30.124'::TIMESTAMP,'21016-11-04T13:51:30.123456+07:00'::TIMESTAMPTZ,'294247-01-01T23:59:59.999999'::TIMESTAMP,'4713-12-31T23:59:59.999999 BC'::TIMESTAMP,'294247-01-01T23:59:59.999999+00:00'::TIMESTAMPTZ,'4714-12-31T23:59:59.999999Z BC'::TIMESTAMPTZ,'infinity'::TIMESTAMP,'-infinity'::TIMESTAMP,'infinity'::TIMESTAMPTZ,'-infinity'::TIMESTAMPTZ)", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.time_table", schemaAndValuesForDateTimeTypes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1345"})
    public void shouldNotSnapshotMaterializedViews() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE TABLE mv_real_table (pk SERIAL, i integer, s VARCHAR(50), PRIMARY KEY(pk));", new String[0]);
        TestHelper.execute("CREATE MATERIALIZED VIEW mv (pk, s) AS SELECT mrv.pk, mrv.s FROM mv_real_table mrv WITH DATA;", new String[0]);
        TestHelper.execute("INSERT INTO mv_real_table (i,s) VALUES (1,'1');", new String[0]);
        TestHelper.execute("REFRESH MATERIALIZED VIEW mv WITH DATA;", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.mv_real_table", schemaAndValueForMaterializedViewBaseType());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1755"})
    public void shouldGenerateSnapshotForPositiveMoney() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO cash_table (csh) VALUES ('$1234.11')", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.cash_table", schemaAndValuesForMoneyTypes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1755"})
    public void shouldGenerateSnapshotForNegativeMoney() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO cash_table (csh) VALUES ('($1234.11)')", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.cash_table"));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.cash_table", schemaAndValuesForNegativeMoneyTypes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1755"})
    public void shouldGenerateSnapshotForNullMoney() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO cash_table (csh) VALUES (NULL)", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.cash_table"));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.cash_table", schemaAndValuesForNullMoneyTypes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1413"})
    public void shouldSnapshotDomainTypeWithPropagatedSourceTypeAttributes() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE DOMAIN float83 AS numeric(8,3) DEFAULT 0.0;", new String[0]);
        TestHelper.execute("CREATE DOMAIN money2 AS MONEY DEFAULT 0.0;", new String[0]);
        TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, salary money, salary2 money2, a numeric(8,3), area float83, PRIMARY KEY(pk));", new String[0]);
        TestHelper.execute("INSERT INTO alias_table (salary, salary2, a, area) values (7.25, 8.25, 12345.123, 12345.123);", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig().with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with("column.propagate.source.type", "public.alias_table.*"));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        List asList = Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("salary", Decimal.builder(2).optional().parameter("__debezium.source.column.type", "MONEY").parameter("__debezium.source.column.length", String.valueOf(Integer.MAX_VALUE)).parameter("__debezium.source.column.scale", "0").build(), BigDecimal.valueOf(7.25d)), new AbstractRecordsProducerTest.SchemaAndValueField("salary2", Decimal.builder(2).optional().parameter("__debezium.source.column.type", "MONEY2").parameter("__debezium.source.column.length", String.valueOf(Integer.MAX_VALUE)).parameter("__debezium.source.column.scale", "0").build(), BigDecimal.valueOf(8.25d)), new AbstractRecordsProducerTest.SchemaAndValueField("a", SchemaBuilder.float64().optional().parameter("__debezium.source.column.type", "NUMERIC").parameter("__debezium.source.column.length", "8").parameter("__debezium.source.column.scale", "3").build(), Double.valueOf(12345.123d)), new AbstractRecordsProducerTest.SchemaAndValueField("area", SchemaBuilder.float64().optional().parameter("__debezium.source.column.type", "FLOAT83").parameter("__debezium.source.column.length", "8").parameter("__debezium.source.column.scale", "3").build(), Double.valueOf(12345.123d)));
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, Collect.hashMapOf("public.alias_table", asList));
        });
    }

    @Test
    @FixFor({"DBZ-1413"})
    public void shouldSnapshotDomainAliasWithProperModifiers() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE DOMAIN varbit2 AS varbit(3);", new String[0]);
        TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, value varbit2 NOT NULL, PRIMARY KEY(pk));", new String[0]);
        TestHelper.execute("INSERT INTO alias_table (value) values (B'101');", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig().with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with("column.propagate.source.type", "public.alias_table.value"));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        List singletonList = Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("value", Bits.builder(3).parameter("__debezium.source.column.type", "VARBIT2").parameter("__debezium.source.column.length", "3").parameter("__debezium.source.column.scale", "0").build(), new byte[]{5}));
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, Collect.hashMapOf("public.alias_table", singletonList));
        });
    }

    @Test
    @FixFor({"DBZ-1413"})
    public void shouldSnapshotDomainTypesLikeBaseTypes() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.execute("CREATE DOMAIN bit2 AS BIT(3);", new String[0]);
        TestHelper.execute("CREATE DOMAIN smallint2 AS smallint;", new String[0]);
        TestHelper.execute("CREATE DOMAIN integer2 as integer;", new String[0]);
        TestHelper.execute("CREATE DOMAIN bigint2 as bigint;", new String[0]);
        TestHelper.execute("CREATE DOMAIN real2 as real;", new String[0]);
        TestHelper.execute("CREATE DOMAIN bool2 AS BOOL DEFAULT false;", new String[0]);
        TestHelper.execute("CREATE DOMAIN float82 as float8;", new String[0]);
        TestHelper.execute("CREATE DOMAIN numeric2 as numeric(6,2);", new String[0]);
        TestHelper.execute("CREATE DOMAIN string2 AS varchar(25) DEFAULT NULL;", new String[0]);
        TestHelper.execute("CREATE DOMAIN date2 AS date;", new String[0]);
        TestHelper.execute("CREATE DOMAIN time2 as time;", new String[0]);
        TestHelper.execute("CREATE DOMAIN timetz2 as timetz;", new String[0]);
        TestHelper.execute("CREATE DOMAIN timestamp2 as timestamp;", new String[0]);
        TestHelper.execute("CREATE DOMAIN timestamptz2 AS timestamptz;", new String[0]);
        TestHelper.execute("CREATE DOMAIN timewotz2 as time without time zone;", new String[0]);
        TestHelper.execute("CREATE DOMAIN box2 as box;", new String[0]);
        TestHelper.execute("CREATE DOMAIN circle2 as circle;", new String[0]);
        TestHelper.execute("CREATE DOMAIN interval2 as interval;", new String[0]);
        TestHelper.execute("CREATE DOMAIN line2 as line;", new String[0]);
        TestHelper.execute("CREATE DOMAIN lseg2 as lseg;", new String[0]);
        TestHelper.execute("CREATE DOMAIN path2 as path;", new String[0]);
        TestHelper.execute("CREATE DOMAIN point2 as point;", new String[0]);
        TestHelper.execute("CREATE DOMAIN polygon2 as polygon;", new String[0]);
        TestHelper.execute("CREATE DOMAIN char2 as char;", new String[0]);
        TestHelper.execute("CREATE DOMAIN text2 as text;", new String[0]);
        TestHelper.execute("CREATE DOMAIN json2 as json;", new String[0]);
        TestHelper.execute("CREATE DOMAIN xml2 as xml;", new String[0]);
        TestHelper.execute("CREATE DOMAIN uuid2 as uuid;", new String[0]);
        TestHelper.execute("CREATE DOMAIN varbit2 as varbit(3);", new String[0]);
        TestHelper.execute("CREATE DOMAIN inet2 as inet;", new String[0]);
        TestHelper.execute("CREATE DOMAIN cidr2 as cidr;", new String[0]);
        TestHelper.execute("CREATE DOMAIN macaddr2 as macaddr;", new String[0]);
        TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, bit_base bit(3) NOT NULL, bit_alias bit2 NOT NULL, smallint_base smallint NOT NULL, smallint_alias smallint2 NOT NULL, integer_base integer NOT NULL, integer_alias integer2 NOT NULL, bigint_base bigint NOT NULL, bigint_alias bigint2 NOT NULL, real_base real NOT NULL, real_alias real2 NOT NULL, float8_base float8 NOT NULL, float8_alias float82 NOT NULL, numeric_base numeric(6,2) NOT NULL, numeric_alias numeric2 NOT NULL, bool_base bool NOT NULL, bool_alias bool2 NOT NULL, string_base varchar(25) NOT NULL, string_alias string2 NOT NULL, date_base date NOT NULL, date_alias date2 NOT NULL, time_base time NOT NULL, time_alias time2 NOT NULL, timetz_base timetz NOT NULL, timetz_alias timetz2 NOT NULL, timestamp_base timestamp NOT NULL, timestamp_alias timestamp2 NOT NULL, timestamptz_base timestamptz NOT NULL, timestamptz_alias timestamptz2 NOT NULL, timewottz_base time without time zone NOT NULL, timewottz_alias timewotz2 NOT NULL, box_base box NOT NULL, box_alias box2 NOT NULL, circle_base circle NOT NULL, circle_alias circle2 NOT NULL, interval_base interval NOT NULL, interval_alias interval2 NOT NULL, line_base line NOT NULL, line_alias line2 NOT NULL, lseg_base lseg NOT NULL, lseg_alias lseg2 NOT NULL, path_base path NOT NULL, path_alias path2 NOT NULL, point_base point NOT NULL, point_alias point2 NOT NULL, polygon_base polygon NOT NULL, polygon_alias polygon2 NOT NULL, char_base char NOT NULL, char_alias char2 NOT NULL, text_base text NOT NULL, text_alias text2 NOT NULL, json_base json NOT NULL, json_alias json2 NOT NULL, xml_base xml NOT NULL, xml_alias xml2 NOT NULL, uuid_base UUID NOT NULL, uuid_alias uuid2 NOT NULL, varbit_base varbit(3) NOT NULL, varbit_alias varbit2 NOT NULL, inet_base inet NOT NULL, inet_alias inet2 NOT NULL, cidr_base cidr NOT NULL, cidr_alias cidr2 NOT NULL, macaddr_base macaddr NOT NULL, macaddr_alias macaddr2 NOT NULL, PRIMARY KEY(pk));", new String[0]);
        TestHelper.execute("INSERT INTO alias_table (bit_base, bit_alias, smallint_base, smallint_alias, integer_base, integer_alias, bigint_base, bigint_alias, real_base, real_alias, float8_base, float8_alias, numeric_base, numeric_alias, bool_base, bool_alias, string_base, string_alias, date_base, date_alias, time_base, time_alias, timetz_base, timetz_alias, timestamp_base, timestamp_alias, timestamptz_base, timestamptz_alias, timewottz_base, timewottz_alias, box_base, box_alias, circle_base, circle_alias, interval_base, interval_alias, line_base, line_alias, lseg_base, lseg_alias, path_base, path_alias, point_base, point_alias, polygon_base, polygon_alias, char_base, char_alias, text_base, text_alias, json_base, json_alias, xml_base, xml_alias, uuid_base, uuid_alias, varbit_base, varbit_alias, inet_base, inet_alias, cidr_base, cidr_alias, macaddr_base, macaddr_alias ) VALUES (B'101', B'101', 1, 1, 1, 1, 1000, 1000, 3.14, 3.14, 3.14, 3.14, 1234.12, 1234.12, true, true, 'hello', 'hello', '2019-10-02', '2019-10-02', '01:02:03', '01:02:03', '01:02:03.123789Z', '01:02:03.123789Z', '2019-10-02T01:02:03.123456', '2019-10-02T01:02:03.123456', '2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, '2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, '01:02:03', '01:02:03', '(0,0),(1,1)', '(0,0),(1,1)', '10,4,10', '10,4,10', '1 year 2 months 3 days 4 hours 5 minutes 6 seconds', '1 year 2 months 3 days 4 hours 5 minutes 6 seconds', '(0,0),(0,1)', '(0,0),(0,1)', '((0,0),(0,1))', '((0,0),(0,1))', '((0,0),(0,1),(0,2))', '((0,0),(0,1),(0,2))', '(1,1)', '(1,1)', '((0,0),(0,1),(1,0),(0,0))', '((0,0),(0,1),(1,0),(0,0))', 'a', 'a', 'Hello World', 'Hello World', '{\"key\": \"value\"}', '{\"key\": \"value\"}', XML('<foo>Hello</foo>'), XML('<foo>Hello</foo>'), '40e6215d-b5c6-4896-987c-f30f3678f608', '40e6215d-b5c6-4896-987c-f30f3678f608', B'101', B'101', '192.168.0.1', '192.168.0.1', '192.168/24', '192.168/24', '08:00:2b:01:02:03', '08:00:2b:01:02:03' );", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table"));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        List<AbstractRecordsProducerTest.SchemaAndValueField> schemasAndValuesForDomainAliasTypes = schemasAndValuesForDomainAliasTypes(false);
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, Collect.hashMapOf("public.alias_table", schemasAndValuesForDomainAliasTypes));
        });
    }

    @FixFor({"DBZ-1413"})
    public void shouldSnapshotNestedDomainAliasTypeModifiersNotPropagated() throws Exception {
        TestHelper.execute("CREATE DOMAIN varbit2 AS varbit(3);", new String[0]);
        TestHelper.execute("CREATE DOMAIN varbit2b AS varbit2;", new String[0]);
        TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, value varbit2b NOT NULL, PRIMARY KEY (pk));", new String[0]);
        TestHelper.execute("INSERT INTO alias_table (value) values (B'101');", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        List singletonList = Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("value", Bits.builder(3).build(), new byte[]{5, 0}));
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, Collect.hashMapOf("public.alias_table", singletonList));
        });
    }

    @Test
    @FixFor({"DBZ-920"})
    public void shouldSnapshotEnumAsKnownType() throws Exception {
        TestHelper.execute("CREATE TYPE test_type AS ENUM ('V1', 'V2');", new String[0]);
        TestHelper.execute("CREATE TABLE enum_table (pk SERIAL, value test_type NOT NULL, primary key(pk));", new String[0]);
        TestHelper.execute("INSERT INTO enum_table (value) values ('V1');", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.enum_table").with("column.propagate.source.type", "public.enum_table.value"));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        List singletonList = Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("value", Enum.builder("V1,V2").parameter("__debezium.source.column.type", "TEST_TYPE").parameter("__debezium.source.column.length", String.valueOf(Integer.MAX_VALUE)).parameter("__debezium.source.column.scale", "0").build(), "V1"));
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, Collect.hashMapOf("public.enum_table", singletonList));
        });
    }

    @Test
    @FixFor({"DBZ-1969"})
    public void shouldSnapshotEnumArrayAsKnownType() throws Exception {
        TestHelper.execute("CREATE TYPE test_type AS ENUM ('V1', 'V2');", new String[0]);
        TestHelper.execute("CREATE TABLE enum_array_table (pk SERIAL, value test_type[] NOT NULL, primary key(pk));", new String[0]);
        TestHelper.execute("INSERT INTO enum_array_table (value) values ('{V1, V2}');", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.enum_array_table").with("column.propagate.source.type", "public.enum_array_table.value"));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        List singletonList = Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("value", SchemaBuilder.array(Enum.builder("V1,V2")).parameter("__debezium.source.column.type", "_TEST_TYPE").parameter("__debezium.source.column.length", String.valueOf(Integer.MAX_VALUE)).parameter("__debezium.source.column.scale", "0").build(), Arrays.asList("V1", "V2")));
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, Collect.hashMapOf("public.enum_array_table", singletonList));
        });
    }

    @Test
    @FixFor({"DBZ-1969"})
    public void shouldSnapshotTimeArrayTypesAsKnownTypes() throws Exception {
        TestHelper.execute("CREATE TABLE time_array_table (pk SERIAL, timea time[] NOT NULL, timetza timetz[] NOT NULL, timestampa timestamp[] NOT NULL, timestamptza timestamptz[] NOT NULL, primary key(pk));", new String[0]);
        TestHelper.execute("INSERT INTO time_array_table (timea, timetza, timestampa, timestamptza) values ('{00:01:02,01:02:03}', '{13:51:02+0200,14:51:03+0200}', '{2020-04-01 00:01:02,2020-04-01 01:02:03}', '{2020-04-01 13:51:02+0200,2020-04-01 14:51:03+0200}')", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.time_array_table"));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, Collect.hashMapOf("public.time_array_table", schemaAndValuesForTimeArrayTypes()));
        });
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldGenerateSnapshotForByteaAsBytes() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO bytea_binmode_table (ba) VALUES (E'\\\\001\\\\002\\\\003'::bytea)", new String[0]);
        buildNoStreamProducer(TestHelper.defaultConfig());
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaBytes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldGenerateSnapshotForByteaAsBase64String() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO bytea_binmode_table (ba) VALUES (E'\\\\001\\\\002\\\\003'::bytea)", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.BASE64));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaBase64());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldGenerateSnapshotForByteaAsHexString() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO bytea_binmode_table (ba) VALUES (E'\\\\001\\\\002\\\\003'::bytea)", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.HEX));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.bytea_binmode_table", schemaAndValueForByteaHex());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldGenerateSnapshotForUnknownColumnAsBytes() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO circle_table (ccircle) VALUES ('((10, 20),10)'::circle)", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.circle_table", schemaAndValueForUnknownColumnBytes());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldGenerateSnapshotForUnknownColumnAsBase64() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO circle_table (ccircle) VALUES ('((10, 20),10)'::circle)", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.BASE64));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.circle_table", schemaAndValueForUnknownColumnBase64());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldGenerateSnapshotForUnknownColumnAsHex() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("INSERT INTO circle_table (ccircle) VALUES ('((10, 20),10)'::circle)", new String[0]);
        buildNoStreamProducer((Configuration.Builder) TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.HEX));
        AbstractRecordsProducerTest.TestConsumer testConsumer = testConsumer(1, "public");
        testConsumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        Map hashMapOf = Collect.hashMapOf("public.circle_table", schemaAndValueForUnknownColumnHex());
        testConsumer.process(sourceRecord -> {
            assertReadRecord(sourceRecord, hashMapOf);
        });
    }

    private void buildNoStreamProducer(Configuration.Builder builder) {
        start(PostgresConnector.class, builder.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
    }

    private void buildWithStreamProducer(Configuration.Builder builder) {
        start(PostgresConnector.class, builder.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.ALWAYS).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
    }
}
