package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.AbstractRecordsProducerTest;
import io.debezium.connector.postgresql.DecoderDifferences;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.ShouldFailWhen;
import io.debezium.relational.TableId;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import junit.framework.TestCase;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/connector/postgresql/RecordsStreamProducerIT.class */
public class RecordsStreamProducerIT extends AbstractRecordsProducerTest {
    private RecordsStreamProducer recordsProducer;
    private AbstractRecordsProducerTest.TestConsumer consumer;
    private final Consumer<Throwable> blackHole = th -> {
    };

    @Rule
    public TestRule conditionalFail = new ConditionalFail();

    @Before
    public void before() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("init_postgis.ddl");
        TestHelper.execute("CREATE SCHEMA IF NOT EXISTS public;DROP TABLE IF EXISTS test_table;CREATE TABLE test_table (pk SERIAL, text TEXT, PRIMARY KEY(pk));CREATE TABLE table_with_interval (id SERIAL PRIMARY KEY, title VARCHAR(512) NOT NULL, time_limit INTERVAL DEFAULT '60 days'::INTERVAL NOT NULL);INSERT INTO test_table(text) VALUES ('insert');");
        setupRecordsProducer(new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).build()));
    }

    @After
    public void after() throws Exception {
        if (this.recordsProducer != null) {
            this.recordsProducer.stop();
        }
    }

    @Test
    public void shouldReceiveChangesForInsertsWithDifferentDataTypes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        assertInsert("INSERT INTO numeric_table (si, i, bi, r, db, r_int, db_int, r_nan, db_nan, r_pinf, db_pinf, r_ninf, db_ninf, ss, bs, b) VALUES (1, 123456, 1234567890123, 3.3, 4.44, 3, 4, 'NaN', 'NaN', 'Infinity', 'Infinity', '-Infinity', '-Infinity', 1, 123, true)", schemasAndValuesForNumericType());
        this.consumer.expects(1);
        assertInsert("INSERT INTO numeric_decimal_table (d, dzs, dvs, d_nn, n, nzs, nvs, d_int, dzs_int, dvs_int, n_int, nzs_int, nvs_int, d_nan, dzs_nan, dvs_nan, n_nan, nzs_nan, nvs_nan) VALUES (1.1, 10.11, 10.1111, 3.30, 22.22, 22.2, 22.2222, 1, 10, 10, 22, 22, 22, null, null, null, null, null, null)", schemasAndValuesForBigDecimalEncodedNumericTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO string_table (vc, vcv, ch, c, t, b, bnn) VALUES ('žš', 'bb', 'cdef', 'abc', 'some text', E'\\\\000\\\\001\\\\002'::bytea, E'\\\\003\\\\004\\\\005'::bytea)", schemasAndValuesForStringTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO cash_table (csh) VALUES ('$1234.11')", schemaAndValuesForMoneyTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO bitbin_table (ba, bol, bs, bv) VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '11'::bit(2), '00'::bit(2))", schemaAndValuesForBinTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttz, tptz, it) VALUES ('2016-11-04T13:51:30.123456'::TIMESTAMP, '1936-10-25T22:10:12.608'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30+02:00'::TIMESTAMPTZ, '2016-11-04'::DATE, '13:51:30'::TIME, '13:51:30.123'::TIME, '13:51:30+02:00'::TIMETZ, '13:51:30.123+02:00'::TIMETZ, 'P1Y2M3DT4H5M0S'::INTERVAL)", schemaAndValuesForDateTimeTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO text_table(j, jb, x, u) VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, '<foo>bar</foo><foo>bar</foo>'::xml, 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::UUID)", schemasAndValuesForTextTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO geom_table(p) VALUES ('(1,1)'::point)", schemaAndValuesForGeomTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO tstzrange_table (unbounded_exclusive_range, bounded_inclusive_range) VALUES ('[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]')", schemaAndValuesForTstzRangeTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO custom_table (lt, i, n) VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL)", schemasAndValuesForCustomTypes());
    }

    @Test(timeout = 30000)
    public void shouldReceiveChangesForInsertsWithPostgisTypes() throws Exception {
        TestHelper.executeDDL("postgis_create_tables.ddl");
        this.consumer = testConsumer(1, "public");
        this.consumer.setIgnoreExtraRecords(true);
        this.recordsProducer.start(this.consumer, this.blackHole);
        TestHelper.execute("INSERT INTO public.dummy_table DEFAULT VALUES;");
        this.consumer.await(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS);
        while (true) {
            if (!this.consumer.isEmpty() && this.consumer.remove().topic().endsWith(".public.dummy_table")) {
                this.consumer.expects(1);
                assertInsert("INSERT INTO public.postgis_table (p, ml) VALUES ('SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography)", schemaAndValuesForPostgisTypes());
                return;
            }
        }
    }

    @Test(timeout = 30000)
    public void shouldReceiveChangesForInsertsWithPostgisArrayTypes() throws Exception {
        TestHelper.executeDDL("postgis_create_tables.ddl");
        this.consumer = testConsumer(1, "public");
        this.consumer.setIgnoreExtraRecords(true);
        this.recordsProducer.start(this.consumer, this.blackHole);
        TestHelper.execute("INSERT INTO public.dummy_table DEFAULT VALUES;");
        this.consumer.await(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS);
        while (true) {
            if (!this.consumer.isEmpty() && this.consumer.remove().topic().endsWith(".public.dummy_table")) {
                this.consumer.expects(1);
                assertInsert("INSERT INTO public.postgis_array_table (ga, gann) VALUES (ARRAY['GEOMETRYCOLLECTION EMPTY'::postgis.geometry, 'POLYGON((166.51 -46.64, 178.52 -46.64, 178.52 -34.45, 166.51 -34.45, 166.51 -46.64))'::postgis.geometry], ARRAY['GEOMETRYCOLLECTION EMPTY'::postgis.geometry, 'POLYGON((166.51 -46.64, 178.52 -46.64, 178.52 -34.45, 166.51 -34.45, 166.51 -46.64))'::postgis.geometry])", schemaAndValuesForPostgisArrayTypes());
                return;
            }
        }
    }

    @Test
    @ShouldFailWhen(DecoderDifferences.AreQuotedIdentifiersUnsupported.class)
    public void shouldReceiveChangesForInsertsWithQuotedNames() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        assertInsert("INSERT INTO \"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\" (\"Quoted_\"\" . Text_Column\") VALUES ('some text')", schemasAndValuesForQuotedTypes());
    }

    @Test
    public void shouldReceiveChangesForInsertsWithArrayTypes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        assertInsert("INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array) VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}')", schemasAndValuesForArrayTypes());
    }

    @Test
    @FixFor({"DBZ-478"})
    public void shouldReceiveChangesForNullInsertsWithArrayTypes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        assertInsert("INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array) VALUES (null, null, null, null, null, null)", schemasAndValuesForArrayTypesWithNullValues());
    }

    @Test
    public void shouldReceiveChangesForNewTable() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("CREATE SCHEMA s1;CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (11);");
        assertRecordInserted("s1.a", "pk", 1);
    }

    @Test
    public void shouldReceiveChangesForRenamedTable() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("DROP TABLE IF EXISTS renamed_test_table;ALTER TABLE test_table RENAME TO renamed_test_table;INSERT INTO renamed_test_table (text) VALUES ('new');");
        assertRecordInserted("public.renamed_test_table", "pk", 2);
    }

    @Test
    public void shouldReceiveChangesForUpdates() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("UPDATE test_table set text='update' WHERE pk=1");
        SourceRecord remove = this.consumer.remove();
        String str = TestHelper.topicName("public.test_table");
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidUpdate(remove, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update")), remove, "after");
        this.consumer.expects(1);
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL");
        executeAndWait("UPDATE test_table set text='update2' WHERE pk=1");
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidUpdate(remove2, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update")), remove2, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update2")), remove2, "after");
    }

    @Test
    public void shouldReceiveChangesForUpdatesWithColumnChanges() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("ALTER TABLE test_table ADD COLUMN uvc VARCHAR(2);ALTER TABLE test_table REPLICA IDENTITY FULL;UPDATE test_table SET uvc ='aa' WHERE pk = 1;");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidUpdate(remove, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("uvc", null, null)), remove, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("uvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "aa")), remove, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table RENAME COLUMN uvc to xvc;UPDATE test_table SET xvc ='bb' WHERE pk = 1;");
        SourceRecord remove2 = this.consumer.remove();
        VerifyRecord.isValidUpdate(remove2, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("xvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "aa")), remove2, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("xvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "bb")), remove2, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table DROP COLUMN xvc;UPDATE test_table SET text ='update' WHERE pk = 1;");
        VerifyRecord.isValidUpdate(this.consumer.remove(), "pk", 1);
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ADD COLUMN modtype INTEGER;INSERT INTO test_table (pk,modtype) VALUES (2,1);");
        SourceRecord remove3 = this.consumer.remove();
        VerifyRecord.isValidInsert(remove3, "pk", 2);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("modtype", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 1)), remove3, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN modtype TYPE SMALLINT;UPDATE test_table SET modtype = 2 WHERE pk = 2;");
        SourceRecord remove4 = this.consumer.remove();
        VerifyRecord.isValidUpdate(remove4, "pk", 2);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("modtype", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 1)), remove4, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("modtype", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 2)), remove4, "after");
    }

    @Test
    public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
        this.consumer = testConsumer(3, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("UPDATE test_table SET text = 'update', pk = 2");
        String str = TestHelper.topicName("public.test_table");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidTombstone(remove2, "pk", 1);
        SourceRecord remove3 = this.consumer.remove();
        TestCase.assertEquals(str, remove3.topic());
        VerifyRecord.isValidInsert(remove3, "pk", 2);
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldReceiveChangesForUpdatesWithPKChangesWithoutTombstone() throws Exception {
        setupRecordsProducer(new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build()));
        this.consumer = testConsumer(2, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("UPDATE test_table SET text = 'update', pk = 2");
        String str = TestHelper.topicName("public.test_table");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidInsert(remove2, "pk", 2);
    }

    @Test
    public void shouldReceiveChangesForDefaultValues() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("ALTER TABLE test_table REPLICA IDENTITY FULL;ALTER TABLE test_table ADD COLUMN default_column TEXT DEFAULT 'default';INSERT INTO test_table (text) VALUES ('update');");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidInsert(remove, "pk", 2);
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update"), new AbstractRecordsProducerTest.SchemaAndValueField("default_column", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "default")), remove, "after");
    }

    @Test
    public void shouldReceiveChangesForTypeConstraints() throws Exception {
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("ALTER TABLE test_table ADD COLUMN num_val NUMERIC(5,2);ALTER TABLE test_table REPLICA IDENTITY FULL;UPDATE test_table SET num_val = 123.45 WHERE pk = 1;");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidUpdate(remove, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", null, null)), remove, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", Decimal.builder(2).optional().build(), new BigDecimal("123.45"))), remove, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE NUMERIC(6,1);INSERT INTO test_table (pk,num_val) VALUES (2,123.41);");
        SourceRecord remove2 = this.consumer.remove();
        VerifyRecord.isValidInsert(remove2, "pk", 2);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", Decimal.builder(1).optional().build(), new BigDecimal("123.4"))), remove2, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE NUMERIC;INSERT INTO test_table (pk,num_val) VALUES (3,123.4567);");
        SourceRecord remove3 = this.consumer.remove();
        Struct struct = new Struct(VariableScaleDecimal.schema());
        struct.put("scale", 4).put("value", new BigDecimal("123.4567").unscaledValue().toByteArray());
        VerifyRecord.isValidInsert(remove3, "pk", 3);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", VariableScaleDecimal.builder().optional().build(), struct)), remove3, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE DECIMAL(12,4);INSERT INTO test_table (pk,num_val) VALUES (4,2.48);");
        SourceRecord remove4 = this.consumer.remove();
        VerifyRecord.isValidInsert(remove4, "pk", 4);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", Decimal.builder(4).optional().build(), new BigDecimal("2.4800"))), remove4, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE DECIMAL(12);INSERT INTO test_table (pk,num_val) VALUES (5,1238);");
        SourceRecord remove5 = this.consumer.remove();
        VerifyRecord.isValidInsert(remove5, "pk", 5);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", Decimal.builder(0).optional().build(), new BigDecimal("1238"))), remove5, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE DECIMAL;INSERT INTO test_table (pk,num_val) VALUES (6,1225.1);");
        SourceRecord remove6 = this.consumer.remove();
        Struct struct2 = new Struct(VariableScaleDecimal.schema());
        struct2.put("scale", 1).put("value", new BigDecimal("1225.1").unscaledValue().toByteArray());
        VerifyRecord.isValidInsert(remove6, "pk", 6);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", VariableScaleDecimal.builder().optional().build(), struct2)), remove6, "after");
    }

    @Test
    public void shouldReceiveChangesForDeletes() throws Exception {
        this.consumer = testConsumer(5, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("INSERT INTO test_table (text) VALUES ('insert2');DELETE FROM test_table WHERE pk > 0;");
        String str = TestHelper.topicName("public.test_table");
        assertRecordInserted("public.test_table", "pk", 2);
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidTombstone(remove2, "pk", 1);
        SourceRecord remove3 = this.consumer.remove();
        TestCase.assertEquals(str, remove3.topic());
        VerifyRecord.isValidDelete(remove3, "pk", 2);
        SourceRecord remove4 = this.consumer.remove();
        TestCase.assertEquals(str, remove4.topic());
        VerifyRecord.isValidTombstone(remove4, "pk", 2);
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldReceiveChangesForDeletesWithoutTombstone() throws Exception {
        setupRecordsProducer(new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false).build()));
        this.consumer = testConsumer(3, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("INSERT INTO test_table (text) VALUES ('insert2');DELETE FROM test_table WHERE pk > 0;");
        String str = TestHelper.topicName("public.test_table");
        assertRecordInserted("public.test_table", "pk", 2);
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidDelete(remove2, "pk", 2);
    }

    @Test
    public void shouldReceiveNumericTypeAsDouble() throws Exception {
        setupRecordsProducer(new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.DOUBLE).build()));
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        assertInsert("INSERT INTO numeric_decimal_table (d, dzs, dvs, d_nn, n, nzs, nvs, d_int, dzs_int, dvs_int, n_int, nzs_int, nvs_int, d_nan, dzs_nan, dvs_nan, n_nan, nzs_nan, nvs_nan) VALUES (1.1, 10.11, 10.1111, 3.30, 22.22, 22.2, 22.2222, 1, 10, 10, 22, 22, 22, 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN')", schemasAndValuesForDoubleEncodedNumericTypes());
    }

    @Test
    @FixFor({"DBZ-611"})
    public void shouldReceiveNumericTypeAsString() throws Exception {
        setupRecordsProducer(new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, PostgresConnectorConfig.DecimalHandlingMode.STRING).build()));
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        assertInsert("INSERT INTO numeric_decimal_table (d, dzs, dvs, d_nn, n, nzs, nvs, d_int, dzs_int, dvs_int, n_int, nzs_int, nvs_int, d_nan, dzs_nan, dvs_nan, n_nan, nzs_nan, nvs_nan) VALUES (1.1, 10.11, 10.1111, 3.30, 22.22, 22.2, 22.2222, 1, 10, 10, 22, 22, 22, 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN')", schemasAndValuesForStringEncodedNumericTypes());
    }

    @Test
    @FixFor({"DBZ-259"})
    public void shouldProcessIntervalDelete() throws Exception {
        this.consumer = testConsumer(4, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
        executeAndWait("INSERT INTO table_with_interval VALUES (default, 'Foo', default);INSERT INTO table_with_interval VALUES (default, 'Bar', default);DELETE FROM table_with_interval WHERE id = 1;");
        String str = TestHelper.topicName("public.table_with_interval");
        assertRecordInserted("public.table_with_interval", "id", 1);
        assertRecordInserted("public.table_with_interval", "id", 2);
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "id", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidTombstone(remove2, "id", 1);
    }

    @Test
    @FixFor({"DBZ-501"})
    public void shouldNotStartAfterStop() throws Exception {
        this.recordsProducer.stop();
        this.recordsProducer.start(this.consumer, this.blackHole);
        setupRecordsProducer(new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).build()));
        this.consumer = testConsumer(1, new String[0]);
        this.recordsProducer.start(this.consumer, this.blackHole);
    }

    private void setupRecordsProducer(PostgresConnectorConfig postgresConnectorConfig) {
        TopicSelector create = TopicSelector.create(postgresConnectorConfig);
        this.recordsProducer = new RecordsStreamProducer(new PostgresTaskContext(postgresConnectorConfig, new PostgresSchema(postgresConnectorConfig, TestHelper.getTypeRegistry(), create), create), new SourceInfo(postgresConnectorConfig.serverName()));
    }

    private void assertInsert(String str, List<AbstractRecordsProducerTest.SchemaAndValueField> list) {
        assertInsert(str, 1, list);
    }

    private void assertInsert(String str, int i, List<AbstractRecordsProducerTest.SchemaAndValueField> list) {
        TableId tableIdFromInsertStmt = tableIdFromInsertStmt(str);
        String str2 = tableIdFromInsertStmt.schema() + "." + tableIdFromInsertStmt.table();
        try {
            executeAndWait(str);
            SourceRecord assertRecordInserted = assertRecordInserted(str2, "pk", i);
            assertRecordOffset(assertRecordInserted, false, false);
            assertRecordSchemaAndValues(list, assertRecordInserted, "after");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private SourceRecord assertRecordInserted(String str, String str2, int i) throws InterruptedException {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName(str), remove.topic());
        VerifyRecord.isValidInsert(remove, str2, i);
        return remove;
    }

    private void executeAndWait(String str) throws Exception {
        TestHelper.execute(str);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
    }
}
