package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.postgresql.AbstractRecordsProducerTest;
import io.debezium.connector.postgresql.DecoderDifferences;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
import io.debezium.data.Bits;
import io.debezium.data.Enum;
import io.debezium.data.Envelope;
import io.debezium.data.SpecialValueDecimal;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.data.VerifyRecord;
import io.debezium.data.geometry.Point;
import io.debezium.doc.FixFor;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.heartbeat.DatabaseHeartbeatImpl;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.ConditionalFail;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.ShouldFailWhen;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.time.MicroTime;
import io.debezium.time.MicroTimestamp;
import io.debezium.time.ZonedTime;
import io.debezium.time.ZonedTimestamp;
import io.debezium.util.Stopwatch;
import io.debezium.util.Testing;
import java.math.BigDecimal;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.StreamSupport;
import junit.framework.TestCase;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.fest.assertions.Assertions;
import org.fest.assertions.Fail;
import org.fest.assertions.MapAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;

/* loaded from: input_file:io/debezium/connector/postgresql/RecordsStreamProducerIT.class */
public class RecordsStreamProducerIT extends AbstractRecordsProducerTest {
    private AbstractRecordsProducerTest.TestConsumer consumer;

    @Rule
    public final TestRule skip = new SkipTestDependingOnDecoderPluginNameRule();

    @Rule
    public TestRule conditionalFail = new ConditionalFail();

    @Before
    public void before() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("init_postgis.ddl");
        TestHelper.execute("CREATE SCHEMA IF NOT EXISTS public;DROP TABLE IF EXISTS test_table;CREATE TABLE test_table (pk SERIAL, text TEXT, PRIMARY KEY(pk));CREATE TABLE table_with_interval (id SERIAL PRIMARY KEY, title VARCHAR(512) NOT NULL, time_limit INTERVAL DEFAULT '60 days'::INTERVAL NOT NULL);INSERT INTO test_table(text) VALUES ('insert');", new String[0]);
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false).with(PostgresConnectorConfig.SCHEMA_EXCLUDE_LIST, "postgis");
        if (TestHelper.decoderPlugin() == PostgresConnectorConfig.LogicalDecoder.PGOUTPUT) {
            with.with("database.replication", "database").with("database.preferQueryMode", "simple").with("assumeMinServerVersion.set", "9.4");
        }
        Testing.Print.enable();
    }

    private void startConnector(Function<Configuration.Builder, Configuration.Builder> function, boolean z, Predicate<SourceRecord> predicate) throws InterruptedException {
        start(PostgresConnector.class, new PostgresConnectorConfig(function.apply(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false).with(PostgresConnectorConfig.SCHEMA_EXCLUDE_LIST, "postgis").with(PostgresConnectorConfig.SNAPSHOT_MODE, z ? PostgresConnectorConfig.SnapshotMode.INITIAL : PostgresConnectorConfig.SnapshotMode.NEVER)).build()).getConfig(), predicate);
        assertConnectorIsRunning();
        waitForStreamingToStart();
        if (z) {
            this.consumer = testConsumer(1, new String[0]);
            this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
            this.consumer.remove();
        }
    }

    private void startConnector(Function<Configuration.Builder, Configuration.Builder> function, boolean z) throws InterruptedException {
        startConnector(function, z, sourceRecord -> {
            return false;
        });
    }

    private void startConnector(Function<Configuration.Builder, Configuration.Builder> function) throws InterruptedException {
        startConnector(function, true);
    }

    private void startConnector() throws InterruptedException {
        startConnector(Function.identity(), true);
    }

    @Test
    public void shouldReceiveChangesForInsertsWithDifferentDataTypes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector();
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.expects(1);
        assertInsert("INSERT INTO numeric_table (si, i, bi, r, db, r_int, db_int, r_nan, db_nan, r_pinf, db_pinf, r_ninf, db_ninf, ss, bs, b) VALUES (1, 123456, 1234567890123, 3.3, 4.44, 3, 4, 'NaN', 'NaN', 'Infinity', 'Infinity', '-Infinity', '-Infinity', 1, 123, true)", 1, schemasAndValuesForNumericType());
        this.consumer.expects(1);
        assertInsert("INSERT INTO numeric_decimal_table (d, dzs, dvs, d_nn, n, nzs, nvs, d_int, dzs_int, dvs_int, n_int, nzs_int, nvs_int, d_nan, dzs_nan, dvs_nan, n_nan, nzs_nan, nvs_nan) VALUES (1.1, 10.11, 10.1111, 3.30, 22.22, 22.2, 22.2222, 1, 10, 10, 22, 22, 22, null, null, null, null, null, null)", 1, schemasAndValuesForBigDecimalEncodedNumericTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO string_table (vc, vcv, ch, c, t, b, bnn, ct) VALUES ('žš', 'bb', 'cdef', 'abc', 'some text', E'\\\\000\\\\001\\\\002'::bytea, E'\\\\003\\\\004\\\\005'::bytea, 'Hello World')", 1, schemasAndValuesForStringTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO cash_table (csh) VALUES ('$1234.11')", 1, schemaAndValuesForMoneyTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO cash_table (csh) VALUES ('($1234.11)')", 2, schemaAndValuesForNegativeMoneyTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO bitbin_table (ba, bol, bol2, bs, bs7, bv, bv2, bvl, bvunlimited1, bvunlimited2) VALUES (E'\\\\001\\\\002\\\\003'::bytea, '0'::bit(1), '1'::bit(1), '11'::bit(2), '1'::bit(7), '00'::bit(2), '000000110000001000000001'::bit(24),'1000000000000000000000000000000000000000000000000000000000000000'::bit(64), '101', '111011010001000110000001000000001')", 1, schemaAndValuesForBinTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttf, ttz, tptz, it, ts_large, ts_large_us, ts_large_ms, tz_large, ts_max, ts_min, tz_max, tz_min, ts_pinf, ts_ninf, tz_pinf, tz_ninf) VALUES ('2016-11-04T13:51:30.123456'::TIMESTAMP, '1936-10-25T22:10:12.608'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456+02:00'::TIMESTAMPTZ, '2016-11-04'::DATE, '13:51:30'::TIME, '13:51:30.123'::TIME, '24:00:00'::TIME, '13:51:30.123789+02:00'::TIMETZ, '13:51:30.123+02:00'::TIMETZ, 'P1Y2M3DT4H5M6.78S'::INTERVAL,'21016-11-04T13:51:30.123456'::TIMESTAMP, '21016-11-04T13:51:30.123457'::TIMESTAMP, '21016-11-04T13:51:30.124'::TIMESTAMP,'21016-11-04T13:51:30.123456+07:00'::TIMESTAMPTZ,'294247-01-01T23:59:59.999999'::TIMESTAMP,'4713-12-31T23:59:59.999999 BC'::TIMESTAMP,'294247-01-01T23:59:59.999999+00:00'::TIMESTAMPTZ,'4714-12-31T23:59:59.999999Z BC'::TIMESTAMPTZ,'infinity'::TIMESTAMP,'-infinity'::TIMESTAMP,'infinity'::TIMESTAMPTZ,'-infinity'::TIMESTAMPTZ)", 1, schemaAndValuesForDateTimeTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO text_table(j, jb, x, u) VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, '<foo>bar</foo><foo>bar</foo>'::xml, 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::UUID)", 1, schemasAndValuesForTextTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO geom_table(p) VALUES ('(1,1)'::point)", 1, schemaAndValuesForGeomTypes());
        this.consumer.expects(1);
        assertInsert("INSERT INTO range_table (unbounded_exclusive_tsrange, bounded_inclusive_tsrange, unbounded_exclusive_tstzrange, bounded_inclusive_tstzrange, unbounded_exclusive_daterange, bounded_exclusive_daterange, int4_number_range, numerange, int8_number_range) VALUES ('[2019-03-31 15:30:00, infinity)', '[2019-03-31 15:30:00, 2019-04-30 15:30:00]', '[2017-06-05 11:29:12.549426+00,)', '[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]', '[2019-03-31, infinity)', '[2019-03-31, 2019-04-30)', '[1000,6000)', '[5.3,6.3)', '[1000000,6000000)')", 1, schemaAndValuesForRangeTypes());
    }

    @Test
    @FixFor({"DBZ-1498"})
    public void shouldReceiveChangesForIntervalAsString() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INTERVAL_HANDLING_MODE, PostgresConnectorConfig.IntervalHandlingMode.STRING);
        });
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.expects(1);
        assertInsert("INSERT INTO time_table(ts, tsneg, ts_ms, ts_us, tz, date, ti, tip, ttf, ttz, tptz, it, ts_large, ts_large_us, ts_large_ms, tz_large, ts_max, ts_min, tz_max, tz_min, ts_pinf, ts_ninf, tz_pinf, tz_ninf) VALUES ('2016-11-04T13:51:30.123456'::TIMESTAMP, '1936-10-25T22:10:12.608'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456'::TIMESTAMP, '2016-11-04T13:51:30.123456+02:00'::TIMESTAMPTZ, '2016-11-04'::DATE, '13:51:30'::TIME, '13:51:30.123'::TIME, '24:00:00'::TIME, '13:51:30.123789+02:00'::TIMETZ, '13:51:30.123+02:00'::TIMETZ, 'P1Y2M3DT4H5M6.78S'::INTERVAL,'21016-11-04T13:51:30.123456'::TIMESTAMP, '21016-11-04T13:51:30.123457'::TIMESTAMP, '21016-11-04T13:51:30.124'::TIMESTAMP,'21016-11-04T13:51:30.123456+07:00'::TIMESTAMPTZ,'294247-01-01T23:59:59.999999'::TIMESTAMP,'4713-12-31T23:59:59.999999 BC'::TIMESTAMP,'294247-01-01T23:59:59.999999+00:00'::TIMESTAMPTZ,'4714-12-31T23:59:59.999999Z BC'::TIMESTAMPTZ,'infinity'::TIMESTAMP,'-infinity'::TIMESTAMP,'infinity'::TIMESTAMPTZ,'-infinity'::TIMESTAMPTZ)", 1, schemaAndValuesForIntervalAsString());
    }

    @Test
    @FixFor({"DBZ-766"})
    public void shouldReceiveChangesAfterConnectionRestart() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SCHEMA_EXCLUDE_LIST, "postgis");
        });
        TestHelper.execute("CREATE TABLE t0 (pk SERIAL, d INTEGER, PRIMARY KEY(pk));", new String[0]);
        this.consumer = testConsumer(1, new String[0]);
        waitForStreamingToStart();
        executeAndWait("INSERT INTO t0 (pk,d) VALUES(1,1);");
        assertRecordInserted("public.t0", "pk", 1);
        stopConnector();
        TestHelper.execute("ALTER TABLE t0 ADD COLUMN d2 INTEGER;", new String[0]);
        TestHelper.execute("ALTER TABLE t0 ALTER COLUMN d SET NOT NULL;", new String[0]);
        startConnector(builder2 -> {
            return builder2.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SCHEMA_EXCLUDE_LIST, "postgis");
        }, false);
        this.consumer = testConsumer(1, new String[0]);
        waitForStreamingToStart();
        executeAndWait("INSERT INTO t0 (pk,d,d2) VALUES (2,1,3);");
        assertRecordInserted("public.t0", "pk", 2);
    }

    @Test
    @FixFor({"DBZ-1698"})
    public void shouldReceiveUpdateSchemaAfterConnectionRestart() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SCHEMA_EXCLUDE_LIST, "postgis").with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false).with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST);
        });
        TestHelper.execute("CREATE TABLE t0 (pk SERIAL, d INTEGER, PRIMARY KEY(pk));", new String[0]);
        this.consumer = testConsumer(1, new String[0]);
        waitForStreamingToStart();
        executeAndWait("INSERT INTO t0 (pk,d) VALUES(1,1);");
        assertRecordInserted("public.t0", "pk", 1);
        stopConnector();
        Thread.sleep(3000L);
        TestHelper.execute("INSERT INTO t0 (pk,d) VALUES(2,2);", new String[0]);
        TestHelper.execute("ALTER TABLE t0 ADD COLUMN d2 NUMERIC(10,6) DEFAULT 0 NOT NULL;", new String[0]);
        TestHelper.execute("ALTER TABLE t0 ALTER COLUMN d SET NOT NULL;", new String[0]);
        startConnector(builder2 -> {
            return builder2.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SCHEMA_EXCLUDE_LIST, "postgis").with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false).with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST);
        }, false);
        this.consumer = testConsumer(2, new String[0]);
        waitForStreamingToStart();
        executeAndWait("INSERT INTO t0 (pk,d,d2) VALUES (3,1,3);");
        assertRecordInserted("public.t0", "pk", 2);
        assertRecordInserted("public.t0", "pk", 3);
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @Test
    public void shouldReceiveChangesForInsertsCustomTypes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true);
        });
        assertInsert("INSERT INTO custom_table (lt, i, n, lt_array) VALUES ('Top.Collections.Pictures.Astronomy.Galaxies', '978-0-393-04002-9', NULL, '{\"Ship.Frigate\",\"Ship.Destroyer\"}')", 1, schemasAndValuesForCustomTypes());
    }

    @Test
    @FixFor({"DBZ-1141"})
    public void shouldProcessNotNullColumnsConnectDateTypes() throws Exception {
        Struct testProcessNotNullColumns = testProcessNotNullColumns(TemporalPrecisionMode.CONNECT);
        if (testProcessNotNullColumns != null) {
            Assertions.assertThat(testProcessNotNullColumns.get("created_at")).isEqualTo(new Date(0L));
            Assertions.assertThat(testProcessNotNullColumns.get("created_at_tz")).isEqualTo("1970-01-01T00:00:00Z");
            Assertions.assertThat(testProcessNotNullColumns.get("ctime")).isEqualTo(new Date(0L));
            Assertions.assertThat(testProcessNotNullColumns.get("ctime_tz")).isEqualTo("00:00:00Z");
            Assertions.assertThat(testProcessNotNullColumns.get("cdate")).isEqualTo(new Date(0L));
            Assertions.assertThat(testProcessNotNullColumns.get("cmoney")).isEqualTo(new BigDecimal("0.00"));
            Assertions.assertThat(testProcessNotNullColumns.get("cbits")).isEqualTo(new byte[0]);
        }
    }

    @Test
    @FixFor({"DBZ-1141"})
    public void shouldProcessNotNullColumnsAdaptiveDateTypes() throws Exception {
        Struct testProcessNotNullColumns = testProcessNotNullColumns(TemporalPrecisionMode.ADAPTIVE);
        if (testProcessNotNullColumns != null) {
            Assertions.assertThat(testProcessNotNullColumns.get("created_at")).isEqualTo(0L);
            Assertions.assertThat(testProcessNotNullColumns.get("created_at_tz")).isEqualTo("1970-01-01T00:00:00Z");
            Assertions.assertThat(testProcessNotNullColumns.get("ctime")).isEqualTo(0L);
            Assertions.assertThat(testProcessNotNullColumns.get("ctime_tz")).isEqualTo("00:00:00Z");
            Assertions.assertThat(testProcessNotNullColumns.get("cdate")).isEqualTo(0);
            Assertions.assertThat(testProcessNotNullColumns.get("cmoney")).isEqualTo(new BigDecimal("0.00"));
            Assertions.assertThat(testProcessNotNullColumns.get("cbits")).isEqualTo(new byte[0]);
        }
    }

    @Test
    @FixFor({"DBZ-1141"})
    public void shouldProcessNotNullColumnsAdaptiveMsDateTypes() throws Exception {
        Struct testProcessNotNullColumns = testProcessNotNullColumns(TemporalPrecisionMode.ADAPTIVE_TIME_MICROSECONDS);
        if (testProcessNotNullColumns != null) {
            Assertions.assertThat(testProcessNotNullColumns.get("created_at")).isEqualTo(0L);
            Assertions.assertThat(testProcessNotNullColumns.get("created_at_tz")).isEqualTo("1970-01-01T00:00:00Z");
            Assertions.assertThat(testProcessNotNullColumns.get("ctime")).isEqualTo(0L);
            Assertions.assertThat(testProcessNotNullColumns.get("ctime_tz")).isEqualTo("00:00:00Z");
            Assertions.assertThat(testProcessNotNullColumns.get("cdate")).isEqualTo(0);
            Assertions.assertThat(testProcessNotNullColumns.get("cmoney")).isEqualTo(new BigDecimal("0.00"));
            Assertions.assertThat(testProcessNotNullColumns.get("cbits")).isEqualTo(new byte[0]);
        }
    }

    @Test
    @FixFor({"DBZ-1158"})
    public void shouldProcessNotNullColumnsFallbacksReplicaIdentity() throws Exception {
        Struct testProcessNotNullColumns = testProcessNotNullColumns(TemporalPrecisionMode.ADAPTIVE);
        if (testProcessNotNullColumns != null) {
            Assertions.assertThat(testProcessNotNullColumns.get("csmallint")).isEqualTo((short) 0);
            Assertions.assertThat(testProcessNotNullColumns.get("cinteger")).isEqualTo(0);
            Assertions.assertThat(testProcessNotNullColumns.get("cbigint")).isEqualTo(0L);
            Assertions.assertThat(testProcessNotNullColumns.get("creal")).isEqualTo(Float.valueOf(0.0f));
            Assertions.assertThat(testProcessNotNullColumns.get("cbool")).isEqualTo(false);
            Assertions.assertThat(testProcessNotNullColumns.get("cfloat8")).isEqualTo(Double.valueOf(0.0d));
            Assertions.assertThat(testProcessNotNullColumns.get("cnumeric")).isEqualTo(new BigDecimal("0.00"));
            Assertions.assertThat(testProcessNotNullColumns.get("cvarchar")).isEqualTo("");
            Assertions.assertThat(testProcessNotNullColumns.get("cbox")).isEqualTo(new byte[0]);
            Assertions.assertThat(testProcessNotNullColumns.get("ccircle")).isEqualTo(new byte[0]);
            Assertions.assertThat(testProcessNotNullColumns.get("cinterval")).isEqualTo(0L);
            Assertions.assertThat(testProcessNotNullColumns.get("cline")).isEqualTo(new byte[0]);
            Assertions.assertThat(testProcessNotNullColumns.get("clseg")).isEqualTo(new byte[0]);
            Assertions.assertThat(testProcessNotNullColumns.get("cpath")).isEqualTo(new byte[0]);
            Assertions.assertThat(testProcessNotNullColumns.get("cpoint")).isEqualTo(Point.createValue(Point.builder().build(), 0.0d, 0.0d));
            Assertions.assertThat(testProcessNotNullColumns.get("cpolygon")).isEqualTo(new byte[0]);
            Assertions.assertThat(testProcessNotNullColumns.get("cchar")).isEqualTo("");
            Assertions.assertThat(testProcessNotNullColumns.get("ctext")).isEqualTo("");
            Assertions.assertThat(testProcessNotNullColumns.get("cjson")).isEqualTo("");
            Assertions.assertThat(testProcessNotNullColumns.get("cxml")).isEqualTo("");
            Assertions.assertThat(testProcessNotNullColumns.get("cuuid")).isEqualTo("");
            Assertions.assertThat(testProcessNotNullColumns.get("cvarbit")).isEqualTo(new byte[0]);
            Assertions.assertThat(testProcessNotNullColumns.get("cinet")).isEqualTo("");
            Assertions.assertThat(testProcessNotNullColumns.get("ccidr")).isEqualTo("");
            Assertions.assertThat(testProcessNotNullColumns.get("cmacaddr")).isEqualTo("");
        }
    }

    private Struct testProcessNotNullColumns(TemporalPrecisionMode temporalPrecisionMode) throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SCHEMA_EXCLUDE_LIST, "postgis").with(PostgresConnectorConfig.TIME_PRECISION_MODE, temporalPrecisionMode);
        });
        this.consumer.expects(1);
        executeAndWait("INSERT INTO not_null_table VALUES (default, 30, '2019-02-10 11:34:58', '2019-02-10 11:35:00', '10:20:11', '10:20:12', '2019-02-01', '$20', B'101', 32766, 2147483646, 9223372036854775806, 3.14, true, 3.14768, 1234.56, 'Test', '(0,0),(1,1)', '<(0,0),1>', '01:02:03', '{0,1,2}', '((0,0),(1,1))', '((0,0),(0,1),(0,2))', '(1,1)', '((0,0),(0,1),(1,1))', 'a', 'hello world', '{\"key\": 123}', '<doc><item>abc</item></doc>', 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11', B'101', '192.168.1.100', '192.168.1', '08:00:2b:01:02:03');");
        this.consumer.remove();
        this.consumer.expects(1);
        executeAndWait("UPDATE not_null_table SET val=40");
        SourceRecord remove = this.consumer.remove();
        VerifyRecord.isValidUpdate(remove, "pk", 1);
        VerifyRecord.isValid(remove);
        return ((Struct) remove.value()).getStruct("before");
    }

    @Test(timeout = 30000)
    public void shouldReceiveChangesForInsertsWithPostgisTypes() throws Exception {
        TestHelper.executeDDL("postgis_create_tables.ddl");
        startConnector();
        this.consumer = testConsumer(1, "public");
        this.consumer.setIgnoreExtraRecords(true);
        TestHelper.execute("INSERT INTO public.dummy_table DEFAULT VALUES;", new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS);
        while (true) {
            if (!this.consumer.isEmpty() && this.consumer.remove().topic().endsWith(".public.dummy_table")) {
                this.consumer.expects(1);
                assertInsert("INSERT INTO public.postgis_table (p, ml) VALUES ('SRID=3187;POINT(174.9479 -36.7208)'::postgis.geometry, 'MULTILINESTRING((169.1321 -44.7032, 167.8974 -44.6414))'::postgis.geography)", 1, schemaAndValuesForPostgisTypes());
                return;
            }
        }
    }

    @Test(timeout = 30000)
    public void shouldReceiveChangesForInsertsWithPostgisArrayTypes() throws Exception {
        TestHelper.executeDDL("postgis_create_tables.ddl");
        startConnector();
        this.consumer = testConsumer(1, "public");
        this.consumer.setIgnoreExtraRecords(true);
        TestHelper.execute("INSERT INTO public.dummy_table DEFAULT VALUES;", new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS);
        while (true) {
            if (!this.consumer.isEmpty() && this.consumer.remove().topic().endsWith(".public.dummy_table")) {
                this.consumer.expects(1);
                assertInsert("INSERT INTO public.postgis_array_table (ga, gann) VALUES (ARRAY['GEOMETRYCOLLECTION EMPTY'::postgis.geometry, 'POLYGON((166.51 -46.64, 178.52 -46.64, 178.52 -34.45, 166.51 -34.45, 166.51 -46.64))'::postgis.geometry], ARRAY['GEOMETRYCOLLECTION EMPTY'::postgis.geometry, 'POLYGON((166.51 -46.64, 178.52 -46.64, 178.52 -34.45, 166.51 -34.45, 166.51 -46.64))'::postgis.geometry])", 1, schemaAndValuesForPostgisArrayTypes());
                return;
            }
        }
    }

    @Test
    @ShouldFailWhen(DecoderDifferences.AreQuotedIdentifiersUnsupported.class)
    public void shouldReceiveChangesForInsertsWithQuotedNames() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector();
        assertInsert("INSERT INTO \"Quoted_\"\" . Schema\".\"Quoted_\"\" . Table\" (\"Quoted_\"\" . Text_Column\") VALUES ('some text')", 1, schemasAndValuesForQuotedTypes());
    }

    @Test
    public void shouldReceiveChangesForInsertsWithArrayTypes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector();
        assertInsert("INSERT INTO array_table (int_array, bigint_array, text_array, char_array, varchar_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array, json_array, jsonb_array) VALUES ('{1,2,3}', '{1550166368505037572}', '{\"one\",\"two\",\"three\"}', '{\"cone\",\"ctwo\",\"cthree\"}', '{\"vcone\",\"vctwo\",\"vcthree\"}', '{2016-11-04,2016-11-05,2016-11-06}', '{1.2,3.4,5.6}', '{1.1,2.22,3.333}', '{\"four\",\"five\",\"six\"}', '{\"192.168.2.0/12\",\"192.168.1.1\",\"192.168.0.2/1\"}', '{\"192.168.100.128/25\", \"192.168.0.0/25\", \"192.168.1.0/24\"}', '{\"08:00:2b:01:02:03\", \"08-00-2b-01-02-03\", \"08002b:010203\"}','{\"[2019-03-31 15:30:00, infinity)\", \"[2019-03-31 15:30:00, 2019-04-30 15:30:00]\"}', '{\"[2017-06-05 11:29:12.549426+00,)\", \"[2017-06-05 11:29:12.549426+00, 2017-06-05 12:34:56.789012+00]\"}', '{\"[2019-03-31, infinity)\", \"[2019-03-31, 2019-04-30)\"}', '{\"[1,6)\", \"[1,4)\"}', '{\"[5.3,6.3)\", \"[10.0,20.0)\"}', '{\"[1000000,6000000)\", \"[5000,9000)\"}', '{\"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\", \"f0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11\"}',array['{\"bar\": \"baz\"}','{\"foo\": \"qux\"}']::json[], array['{\"bar\": \"baz\"}','{\"foo\": \"qux\"}']::jsonb[])", 1, schemasAndValuesForArrayTypes());
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT, reason = "Decoder synchronizes all schema columns when processing relation messages")
    @FixFor({"DBZ-1029"})
    public void shouldReceiveChangesForInsertsIndependentOfReplicaIdentity() throws Exception {
        startConnector();
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;", new String[0]);
        assertInsert("INSERT INTO test_table (text) VALUES ('pk_and_default');", 2, Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "pk_and_default")));
        this.consumer.expects(1);
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL;", new String[0]);
        assertInsert("INSERT INTO test_table (text) VALUES ('pk_and_full');", 3, Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "pk_and_full")));
        this.consumer.expects(1);
        TestHelper.execute("ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;", new String[0]);
        assertInsert("INSERT INTO test_table (pk, text) VALUES (4, 'no_pk_and_full');", 4, Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "no_pk_and_full")));
        this.consumer.expects(1);
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;", new String[0]);
        assertInsert("INSERT INTO test_table (pk, text) VALUES (5, 'no_pk_and_default');", 5, Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "no_pk_and_default")));
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Decoder synchronizes all schema columns when processing relation messages")
    @FixFor({"DBZ-1029"})
    public void shouldReceiveChangesForInsertsIndependentOfReplicaIdentityWhenSchemaChanged() throws Exception {
        startConnector();
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;", new String[0]);
        assertInsert("INSERT INTO test_table (text) VALUES ('pk_and_default');", 2, Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "pk_and_default")));
        this.consumer.expects(1);
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL;", new String[0]);
        assertInsert("INSERT INTO test_table (text) VALUES ('pk_and_full');", 3, Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "pk_and_full")));
        this.consumer.expects(1);
        TestHelper.execute("ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;", new String[0]);
        assertInsert("INSERT INTO test_table (pk, text) VALUES (4, 'no_pk_and_full');", Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", SchemaBuilder.INT32_SCHEMA, 4), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "no_pk_and_full")));
        this.consumer.expects(1);
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;", new String[0]);
        assertInsert("INSERT INTO test_table (pk, text) VALUES (5, 'no_pk_and_default');", Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", SchemaBuilder.INT32_SCHEMA, 5), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "no_pk_and_default")));
    }

    @Test
    @FixFor({"DBZ-478"})
    public void shouldReceiveChangesForNullInsertsWithArrayTypes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector();
        assertInsert("INSERT INTO array_table_with_nulls (int_array, bigint_array, text_array, date_array, numeric_array, varnumeric_array, citext_array, inet_array, cidr_array, macaddr_array, tsrange_array, tstzrange_array, daterange_array, int4range_array, numerange_array, int8range_array, uuid_array, json_array, jsonb_array) VALUES (null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)", 1, schemasAndValuesForArrayTypesWithNullValues());
    }

    @Test
    public void shouldReceiveChangesForNewTable() throws Exception {
        startConnector();
        executeAndWait("CREATE SCHEMA s1;CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (11);");
        assertRecordInserted("s1.a", "pk", 1);
    }

    @Test
    public void shouldReceiveChangesForRenamedTable() throws Exception {
        startConnector();
        executeAndWait("DROP TABLE IF EXISTS renamed_test_table;ALTER TABLE test_table RENAME TO renamed_test_table;INSERT INTO renamed_test_table (text) VALUES ('new');");
        assertRecordInserted("public.renamed_test_table", "pk", 2);
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT, reason = "An update on a table with no primary key and default replica throws PSQLException as tables must have a PK")
    public void shouldReceiveChangesForUpdates() throws Exception {
        startConnector();
        executeAndWait("UPDATE test_table set text='update' WHERE pk=1");
        SourceRecord remove = this.consumer.remove();
        String str = TestHelper.topicName("public.test_table");
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidUpdate(remove, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update")), remove, "after");
        this.consumer.expects(1);
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY FULL", new String[0]);
        executeAndWait("UPDATE test_table set text='update2' WHERE pk=1");
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidUpdate(remove2, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update")), remove2, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update2")), remove2, "after");
        TestHelper.execute("ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;", new String[0]);
        this.consumer.expects(1);
        executeAndWait("UPDATE test_table SET text = 'update3' WHERE pk = 1;");
        SourceRecord remove3 = this.consumer.remove();
        TestCase.assertEquals(str, remove3.topic());
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update2")), remove3, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update3")), remove3, "after");
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;", new String[0]);
        this.consumer.expects(0);
        executeAndWaitForNoRecords("UPDATE test_table SET text = 'no_pk_and_default' WHERE pk = 1;");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    public void shouldReceiveChangesForUpdatesWithColumnChanges() throws Exception {
        startConnector();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("ALTER TABLE test_table ADD COLUMN uvc VARCHAR(2);ALTER TABLE test_table REPLICA IDENTITY FULL;UPDATE test_table SET uvc ='aa' WHERE pk = 1;");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidUpdate(remove, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("uvc", null, null)), remove, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("uvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "aa")), remove, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table RENAME COLUMN uvc to xvc;UPDATE test_table SET xvc ='bb' WHERE pk = 1;");
        SourceRecord remove2 = this.consumer.remove();
        VerifyRecord.isValidUpdate(remove2, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("xvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "aa")), remove2, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("xvc", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "bb")), remove2, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table DROP COLUMN xvc;UPDATE test_table SET text ='update' WHERE pk = 1;");
        VerifyRecord.isValidUpdate(this.consumer.remove(), "pk", 1);
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ADD COLUMN modtype INTEGER;INSERT INTO test_table (pk,modtype) VALUES (2,1);");
        SourceRecord remove3 = this.consumer.remove();
        VerifyRecord.isValidInsert(remove3, "pk", 2);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("modtype", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 1)), remove3, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN modtype TYPE SMALLINT;UPDATE test_table SET modtype = 2 WHERE pk = 2;");
        SourceRecord remove4 = this.consumer.remove();
        VerifyRecord.isValidUpdate(remove4, "pk", 2);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("modtype", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 1)), remove4, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("modtype", SchemaBuilder.OPTIONAL_INT16_SCHEMA, (short) 2)), remove4, "after");
    }

    private Header getPKUpdateNewKeyHeader(SourceRecord sourceRecord) {
        return getHeaderField(sourceRecord, "__debezium.newkey");
    }

    private Header getPKUpdateOldKeyHeader(SourceRecord sourceRecord) {
        return getHeaderField(sourceRecord, "__debezium.oldkey");
    }

    private Header getHeaderField(SourceRecord sourceRecord, String str) {
        return (Header) ((List) StreamSupport.stream(sourceRecord.headers().spliterator(), false).filter(header -> {
            return str.equals(header.key());
        }).collect(Collectors.toList())).get(0);
    }

    @Test
    public void shouldReceiveChangesForUpdatesWithPKChanges() throws Exception {
        startConnector();
        this.consumer = testConsumer(3, new String[0]);
        executeAndWait("UPDATE test_table SET text = 'update', pk = 2");
        String str = TestHelper.topicName("public.test_table");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        TestCase.assertEquals(2, ((Struct) getPKUpdateNewKeyHeader(remove).value()).getInt32("pk"));
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidTombstone(remove2, "pk", 1);
        SourceRecord remove3 = this.consumer.remove();
        TestCase.assertEquals(str, remove3.topic());
        VerifyRecord.isValidInsert(remove3, "pk", 2);
        TestCase.assertEquals(1, ((Struct) getPKUpdateOldKeyHeader(remove3).value()).getInt32("pk"));
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldReceiveChangesForUpdatesWithPKChangesWithoutTombstone() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false);
        });
        this.consumer = testConsumer(2, new String[0]);
        executeAndWait("UPDATE test_table SET text = 'update', pk = 2");
        String str = TestHelper.topicName("public.test_table");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        TestCase.assertEquals(2, ((Struct) getPKUpdateNewKeyHeader(remove).value()).getInt32("pk"));
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidInsert(remove2, "pk", 2);
        TestCase.assertEquals(1, ((Struct) getPKUpdateOldKeyHeader(remove2).value()).getInt32("pk"));
    }

    @Test
    public void shouldReceiveChangesForDefaultValues() throws Exception {
        startConnector();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("ALTER TABLE test_table REPLICA IDENTITY FULL;ALTER TABLE test_table ADD COLUMN default_column TEXT DEFAULT 'default';INSERT INTO test_table (text) VALUES ('update');");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidInsert(remove, "pk", 2);
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "update"), new AbstractRecordsProducerTest.SchemaAndValueField("default_column", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "default")), remove, "after");
    }

    @Test
    public void shouldReceiveChangesForTypeConstraints() throws Exception {
        startConnector();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("ALTER TABLE test_table ADD COLUMN num_val NUMERIC(5,2);ALTER TABLE test_table REPLICA IDENTITY FULL;UPDATE test_table SET num_val = 123.45 WHERE pk = 1;");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidUpdate(remove, "pk", 1);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", null, null)), remove, "before");
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", Decimal.builder(2).parameter("connect.decimal.precision", "5").optional().build(), new BigDecimal("123.45"))), remove, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE NUMERIC(6,1);INSERT INTO test_table (pk,num_val) VALUES (2,123.41);");
        SourceRecord remove2 = this.consumer.remove();
        VerifyRecord.isValidInsert(remove2, "pk", 2);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", Decimal.builder(1).parameter("connect.decimal.precision", "6").optional().build(), new BigDecimal("123.4"))), remove2, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE NUMERIC;INSERT INTO test_table (pk,num_val) VALUES (3,123.4567);");
        SourceRecord remove3 = this.consumer.remove();
        Struct struct = new Struct(VariableScaleDecimal.schema());
        struct.put("scale", 4).put("value", new BigDecimal("123.4567").unscaledValue().toByteArray());
        VerifyRecord.isValidInsert(remove3, "pk", 3);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", VariableScaleDecimal.builder().optional().build(), struct)), remove3, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE DECIMAL(12,4);INSERT INTO test_table (pk,num_val) VALUES (4,2.48);");
        SourceRecord remove4 = this.consumer.remove();
        VerifyRecord.isValidInsert(remove4, "pk", 4);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", Decimal.builder(4).parameter("connect.decimal.precision", "12").optional().build(), new BigDecimal("2.4800"))), remove4, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE DECIMAL(12);INSERT INTO test_table (pk,num_val) VALUES (5,1238);");
        SourceRecord remove5 = this.consumer.remove();
        VerifyRecord.isValidInsert(remove5, "pk", 5);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", Decimal.builder(0).parameter("connect.decimal.precision", "12").optional().build(), new BigDecimal("1238"))), remove5, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val TYPE DECIMAL;INSERT INTO test_table (pk,num_val) VALUES (6,1225.1);");
        SourceRecord remove6 = this.consumer.remove();
        Struct struct2 = new Struct(VariableScaleDecimal.schema());
        struct2.put("scale", 1).put("value", new BigDecimal("1225.1").unscaledValue().toByteArray());
        VerifyRecord.isValidInsert(remove6, "pk", 6);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", VariableScaleDecimal.builder().optional().build(), struct2)), remove6, "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table ALTER COLUMN num_val SET NOT NULL;INSERT INTO test_table (pk,num_val) VALUES (7,1976);");
        SourceRecord remove7 = this.consumer.remove();
        struct2.put("scale", 0).put("value", new BigDecimal("1976").unscaledValue().toByteArray());
        VerifyRecord.isValidInsert(remove7, "pk", 7);
        assertRecordSchemaAndValues(Collections.singletonList(new AbstractRecordsProducerTest.SchemaAndValueField("num_val", VariableScaleDecimal.builder().build(), struct2)), remove7, "after");
    }

    @Test
    public void shouldReceiveChangesForDeletes() throws Exception {
        startConnector();
        this.consumer = testConsumer(5, new String[0]);
        executeAndWait("INSERT INTO test_table (text) VALUES ('insert2');DELETE FROM test_table WHERE pk > 0;");
        String str = TestHelper.topicName("public.test_table");
        assertRecordInserted("public.test_table", "pk", 2);
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidTombstone(remove2, "pk", 1);
        SourceRecord remove3 = this.consumer.remove();
        TestCase.assertEquals(str, remove3.topic());
        VerifyRecord.isValidDelete(remove3, "pk", 2);
        SourceRecord remove4 = this.consumer.remove();
        TestCase.assertEquals(str, remove4.topic());
        VerifyRecord.isValidTombstone(remove4, "pk", 2);
    }

    @Test
    @FixFor({"DBZ-582"})
    public void shouldReceiveChangesForDeletesWithoutTombstone() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false);
        });
        this.consumer = testConsumer(3, new String[0]);
        executeAndWait("INSERT INTO test_table (text) VALUES ('insert2');DELETE FROM test_table WHERE pk > 0;");
        String str = TestHelper.topicName("public.test_table");
        assertRecordInserted("public.test_table", "pk", 2);
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidDelete(remove2, "pk", 2);
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT, reason = "A delete on a table with no primary key and default replica throws PSQLException as tables must have a PK")
    public void shouldReceiveChangesForDeletesDependingOnReplicaIdentity() throws Exception {
        String str = TestHelper.topicName("public.test_table");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(CommonConnectorConfig.TOMBSTONES_ON_DELETE, false);
        });
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;DELETE FROM test_table WHERE pk = 1;");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "pk", 1);
        this.consumer.expects(2);
        executeAndWait("ALTER TABLE test_table REPLICA IDENTITY FULL;ALTER TABLE test_table DROP CONSTRAINT test_table_pkey CASCADE;INSERT INTO test_table (pk, text) VALUES (2, 'insert2');DELETE FROM test_table WHERE pk = 2;");
        assertRecordInserted("public.test_table", "pk", 2);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidDelete(remove2, "pk", 2);
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;INSERT INTO test_table (pk, text) VALUES (3, 'insert3');DELETE FROM test_table WHERE pk = 3;");
        assertRecordInserted("public.test_table", "pk", 3);
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    public void shouldReceiveNumericTypeAsDouble() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE);
        });
        assertInsert("INSERT INTO numeric_decimal_table (d, dzs, dvs, d_nn, n, nzs, nvs, d_int, dzs_int, dvs_int, n_int, nzs_int, nvs_int, d_nan, dzs_nan, dvs_nan, n_nan, nzs_nan, nvs_nan) VALUES (1.1, 10.11, 10.1111, 3.30, 22.22, 22.2, 22.2222, 1, 10, 10, 22, 22, 22, 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN')", 1, schemasAndValuesForDoubleEncodedNumericTypes());
    }

    @Test
    @FixFor({"DBZ-611"})
    public void shouldReceiveNumericTypeAsString() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.STRING);
        });
        assertInsert("INSERT INTO numeric_decimal_table (d, dzs, dvs, d_nn, n, nzs, nvs, d_int, dzs_int, dvs_int, n_int, nzs_int, nvs_int, d_nan, dzs_nan, dvs_nan, n_nan, nzs_nan, nvs_nan) VALUES (1.1, 10.11, 10.1111, 3.30, 22.22, 22.2, 22.2222, 1, 10, 10, 22, 22, 22, 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN')", 1, schemasAndValuesForStringEncodedNumericTypes());
    }

    @Test
    @FixFor({"DBZ-898"})
    public void shouldReceiveHStoreTypeWithSingleValueAsMap() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.MAP);
        });
        assertInsert("INSERT INTO hstore_table (hs) VALUES ('\"key\" => \"val\"'::hstore)", 1, schemaAndValueFieldForMapEncodedHStoreType());
    }

    @Test
    @FixFor({"DBZ-898"})
    public void shouldReceiveHStoreTypeWithMultipleValuesAsMap() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.MAP);
        });
        assertInsert("INSERT INTO hstore_table_mul (hs, hsarr) VALUES ('\"key1\" => \"val1\",\"key2\" => \"val2\",\"key3\" => \"val3\"', array['\"key4\" => \"val4\",\"key5\" => NULL'::hstore, '\"key6\" => \"val6\"'])", 1, schemaAndValueFieldForMapEncodedHStoreTypeWithMultipleValues());
    }

    @Test
    @FixFor({"DBZ-898"})
    public void shouldReceiveHStoreTypeWithNullValuesAsMap() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.MAP);
        });
        assertInsert("INSERT INTO hstore_table_with_null (hs) VALUES ('\"key1\" => \"val1\",\"key2\" => NULL')", 1, schemaAndValueFieldForMapEncodedHStoreTypeWithNullValues());
    }

    @Test
    @FixFor({"DBZ-898"})
    public void shouldReceiveHStoreTypeWithSpecialCharactersInValuesAsMap() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.MAP);
        });
        assertInsert("INSERT INTO hstore_table_with_special (hs) VALUES ('\"key_#1\" => \"val 1\",\"key 2\" =>\" ##123 78\"')", 1, schemaAndValueFieldForMapEncodedHStoreTypeWithSpecialCharacters());
    }

    @Test
    @FixFor({"DBZ-898"})
    public void shouldReceiveHStoreTypeAsJsonString() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        this.consumer = testConsumer(1, new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.JSON);
        });
        assertInsert("INSERT INTO hstore_table (hs) VALUES ('\"key\" => \"val\"'::hstore)", 1, schemaAndValueFieldForJsonEncodedHStoreType());
    }

    @Test
    @FixFor({"DBZ-898"})
    public void shouldReceiveHStoreTypeWithMultipleValuesAsJsonString() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.JSON);
        });
        assertInsert("INSERT INTO hstore_table_mul (hs, hsarr) VALUES ('\"key1\" => \"val1\",\"key2\" => \"val2\",\"key3\" => \"val3\"', array['\"key4\" => \"val4\",\"key5\" => NULL'::hstore, '\"key6\" => \"val6\"'])", 1, schemaAndValueFieldForJsonEncodedHStoreTypeWithMultipleValues());
    }

    @Test
    @FixFor({"DBZ-898"})
    public void shouldReceiveHStoreTypeWithSpecialValuesInJsonString() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.JSON);
        });
        assertInsert("INSERT INTO hstore_table_with_special (hs) VALUES ('\"key_#1\" => \"val 1\",\"key 2\" =>\" ##123 78\"')", 1, schemaAndValueFieldForJsonEncodedHStoreTypeWithSpcialCharacters());
    }

    @Test
    @FixFor({"DBZ-898"})
    public void shouldReceiveHStoreTypeWithNullValuesAsJsonString() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.HSTORE_HANDLING_MODE, PostgresConnectorConfig.HStoreHandlingMode.JSON);
        });
        assertInsert("INSERT INTO hstore_table_with_null (hs) VALUES ('\"key1\" => \"val1\",\"key2\" => NULL')", 1, schemaAndValueFieldForJsonEncodedHStoreTypeWithNullValues());
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldReceiveByteaBytes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.BYTES);
        });
        assertInsert("INSERT INTO bytea_binmode_table (ba) VALUES (E'\\\\001\\\\002\\\\003'::bytea)", 1, schemaAndValueForByteaBytes());
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldReceiveByteaBase64String() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.BASE64);
        });
        assertInsert("INSERT INTO bytea_binmode_table (ba) VALUES (E'\\\\001\\\\002\\\\003'::bytea)", 1, schemaAndValueForByteaBase64());
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldReceiveByteaHexString() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.HEX);
        });
        assertInsert("INSERT INTO bytea_binmode_table (ba) VALUES (E'\\\\001\\\\002\\\\003'::bytea)", 1, schemaAndValueForByteaHex());
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldReceiveUnknownTypeAsBytes() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true);
        });
        assertInsert("INSERT INTO circle_table (ccircle) VALUES ('((10, 20),10)'::circle)", 1, schemaAndValueForUnknownColumnBytes());
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldReceiveUnknownTypeAsBase64() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.BASE64);
        });
        assertInsert("INSERT INTO circle_table (ccircle) VALUES ('((10, 20),10)'::circle)", 1, schemaAndValueForUnknownColumnBase64());
    }

    @Test
    @FixFor({"DBZ-1814"})
    public void shouldReceiveUnknownTypeAsHex() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.BINARY_HANDLING_MODE, CommonConnectorConfig.BinaryHandlingMode.HEX);
        });
        assertInsert("INSERT INTO circle_table (ccircle) VALUES ('((10, 20),10)'::circle)", 1, schemaAndValueForUnknownColumnHex());
    }

    @Test
    @FixFor({"DBZ-259"})
    public void shouldProcessIntervalDelete() throws Exception {
        startConnector();
        this.consumer.expects(4);
        executeAndWait("INSERT INTO table_with_interval VALUES (default, 'Foo', default);INSERT INTO table_with_interval VALUES (default, 'Bar', default);DELETE FROM table_with_interval WHERE id = 1;");
        String str = TestHelper.topicName("public.table_with_interval");
        assertRecordInserted("public.table_with_interval", "id", 1);
        assertRecordInserted("public.table_with_interval", "id", 2);
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(str, remove.topic());
        VerifyRecord.isValidDelete(remove, "id", 1);
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(str, remove2.topic());
        VerifyRecord.isValidTombstone(remove2, "id", 1);
    }

    @Test
    @FixFor({"DBZ-644"})
    public void shouldPropagateSourceColumnTypeToSchemaParameter() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with("column.propagate.source.type", ".*vc.*");
        });
        assertInsert("INSERT INTO string_table (vc, vcv, ch, c, t, b, bnn, ct) VALUES ('žš', 'bb', 'cdef', 'abc', 'some text', E'\\\\000\\\\001\\\\002'::bytea, E'\\\\003\\\\004\\\\005'::bytea, 'Hello World')", 1, schemasAndValuesForStringTypesWithSourceColumnTypeInfo());
    }

    @Test
    @FixFor({"DBZ-1073"})
    public void shouldPropagateSourceColumnTypeScaleToSchemaParameter() throws Exception {
        TestHelper.executeDDL("postgres_create_tables.ddl");
        startConnector(builder -> {
            return builder.with("column.propagate.source.type", ".*(d|dzs)").with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE);
        });
        assertInsert("INSERT INTO numeric_decimal_table (d, dzs, dvs, d_nn, n, nzs, nvs, d_int, dzs_int, dvs_int, n_int, nzs_int, nvs_int, d_nan, dzs_nan, dvs_nan, n_nan, nzs_nan, nvs_nan) VALUES (1.1, 10.11, 10.1111, 3.30, 22.22, 22.2, 22.2222, 1, 10, 10, 22, 22, 22, 'NaN', 'NaN', 'NaN', 'NaN', 'NaN', 'NaN')", 1, schemasAndValuesForNumericTypesWithSourceColumnTypeInfo());
    }

    @Test
    @FixFor({"DBZ-800"})
    public void shouldReceiveHeartbeatAlsoWhenChangingNonWhitelistedTable() throws Exception {
        startConnector(builder -> {
            return builder.with(Heartbeat.HEARTBEAT_INTERVAL, "100").with(PostgresConnectorConfig.POLL_INTERVAL_MS, "50").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.b").with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER);
        }, false);
        waitForStreamingToStart();
        TestHelper.execute("CREATE SCHEMA s1;CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.b (pk SERIAL, bb integer, PRIMARY KEY(pk));INSERT INTO s1.b (bb) VALUES (22);", new String[0]);
        try {
            AtomicInteger atomicInteger = new AtomicInteger();
            AtomicBoolean atomicBoolean = new AtomicBoolean();
            Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
                SourceRecord consumeRecord = consumeRecord();
                if (consumeRecord != null) {
                    if (consumeRecord.topic().endsWith("s1.b")) {
                        assertRecordInserted(consumeRecord, "s1.b", "pk", 1);
                        atomicBoolean.set(true);
                    } else {
                        assertHeartBeatRecord(consumeRecord);
                        atomicInteger.incrementAndGet();
                    }
                }
                return Boolean.valueOf(atomicBoolean.get() && atomicInteger.get() > 0);
            });
        } catch (ConditionTimeoutException e) {
            Fail.fail("Failed to receive insert and at least 1 heartbeat message", e);
        }
        HashSet hashSet = new HashSet();
        TestHelper.execute("INSERT INTO s1.a (aa) VALUES (11);", new String[0]);
        try {
            Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS).until(() -> {
                SourceRecord consumeRecord = consumeRecord();
                if (consumeRecord == null) {
                    return false;
                }
                hashSet.add((Long) consumeRecord.sourceOffset().get("lsn"));
                return Boolean.valueOf(hashSet.size() >= 2);
            });
        } catch (ConditionTimeoutException e2) {
            Fail.fail("Failed to detect at least 2 LSN changes", e2);
        }
    }

    @Test
    @FixFor({"DBZ-1565"})
    public void shouldWarnOnMissingHeartbeatForFilteredEvents() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.POLL_INTERVAL_MS, "50").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.b").with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER);
        }, false);
        waitForStreamingToStart();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("CREATE SCHEMA s1;CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.b (pk SERIAL, bb integer, PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (11);INSERT INTO s1.b (bb) VALUES (22);");
        TestHelper.execute((String) IntStream.range(0, 10100).mapToObj(i -> {
            return "INSERT INTO s1.a (pk) VALUES (default);";
        }).collect(Collectors.joining()), new String[0]);
        Awaitility.await().alias("WAL growing log message").pollInterval(1L, TimeUnit.SECONDS).atMost(5 * TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(logInterceptor.containsWarnMessage("Received 10001 events which were all filtered out, so no offset could be committed. This prevents the replication slot from acknowledging the processed WAL offsets, causing a growing backlog of non-removeable WAL segments on the database server. Consider to either adjust your filter configuration or enable heartbeat events (via the heartbeat.interval.ms option) to avoid this situation."));
        });
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT, reason = "Decoder synchronizes all schema columns when processing relation messages")
    @FixFor({"DBZ-911"})
    public void shouldNotRefreshSchemaOnUnchangedToastedData() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST);
        });
        String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(10000);
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("ALTER TABLE test_table ADD COLUMN not_toast integer; INSERT INTO test_table (not_toast, text) values (10, '" + randomAlphanumeric + "')");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, randomAlphanumeric)), this.consumer.remove(), "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table DROP COLUMN text; update test_table set not_toast = 5 where not_toast = 10");
        assertWithTask(sourceTask -> {
            TestCase.assertEquals(Arrays.asList("pk", "text", "not_toast"), ((PostgresConnectorTask) sourceTask).getTaskContext().schema().tableFor(TableId.parse("public.test_table")).retrieveColumnNames());
        });
        TestHelper.assertNoOpenTransactions();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Decoder synchronizes all schema columns when processing relation messages")
    @FixFor({"DBZ-911"})
    public void shouldRefreshSchemaOnUnchangedToastedDataWhenSchemaChanged() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST);
        });
        String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(10000);
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("ALTER TABLE test_table ADD COLUMN not_toast integer; INSERT INTO test_table (not_toast, text) values (10, '" + randomAlphanumeric + "')");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, randomAlphanumeric)), this.consumer.remove(), "after");
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_table DROP COLUMN text; update test_table set not_toast = 5 where not_toast = 10");
        assertWithTask(sourceTask -> {
            TestCase.assertEquals(Arrays.asList("pk", "not_toast"), ((PostgresConnectorTask) sourceTask).getTaskContext().schema().tableFor(TableId.parse("public.test_table")).retrieveColumnNames());
        });
    }

    @Test
    @FixFor({"DBZ-842"})
    public void shouldNotPropagateUnchangedToastedData() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST);
        });
        String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(10000);
        String randomAlphanumeric2 = RandomStringUtils.randomAlphanumeric(10000);
        String str = "ALTER TABLE test_table ADD COLUMN not_toast integer;ALTER TABLE test_table ADD COLUMN mandatory_text TEXT NOT NULL DEFAULT '';ALTER TABLE test_table ALTER COLUMN mandatory_text SET STORAGE EXTENDED;ALTER TABLE test_table ALTER COLUMN mandatory_text SET DEFAULT '" + RandomStringUtils.randomAlphanumeric(10000) + "';INSERT INTO test_table (not_toast, text, mandatory_text) values (10, '" + randomAlphanumeric + "', '" + randomAlphanumeric + "');INSERT INTO test_table (not_toast, text, mandatory_text) values (10, '" + randomAlphanumeric2 + "', '" + randomAlphanumeric2 + "');";
        this.consumer = testConsumer(2, new String[0]);
        executeAndWait(str);
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, randomAlphanumeric), new AbstractRecordsProducerTest.SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, randomAlphanumeric)), this.consumer.remove(), "after");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, randomAlphanumeric2), new AbstractRecordsProducerTest.SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, randomAlphanumeric2)), this.consumer.remove(), "after");
        this.consumer.expects(6);
        executeAndWait("UPDATE test_table SET not_toast = 2;UPDATE test_table SET not_toast = 3;");
        this.consumer.process(sourceRecord -> {
            assertWithTask(sourceTask -> {
                TestCase.assertEquals(Arrays.asList("pk", "text", "not_toast", "mandatory_text"), ((PostgresConnectorTask) sourceTask).getTaskContext().schema().tableFor(TableId.parse("public.test_table")).retrieveColumnNames());
            });
        });
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "insert"), new AbstractRecordsProducerTest.SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "")), this.consumer.remove(), "after");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()), new AbstractRecordsProducerTest.SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder())), this.consumer.remove(), "after");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()), new AbstractRecordsProducerTest.SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder())), this.consumer.remove(), "after");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "insert"), new AbstractRecordsProducerTest.SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, "")), this.consumer.remove(), "after");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()), new AbstractRecordsProducerTest.SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder())), this.consumer.remove(), "after");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 3), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, DecoderDifferences.optionalToastedValuePlaceholder()), new AbstractRecordsProducerTest.SchemaAndValueField("mandatory_text", SchemaBuilder.STRING_SCHEMA, DecoderDifferences.mandatoryToastedValuePlaceholder())), this.consumer.remove(), "after");
    }

    @Test
    @FixFor({"DBZ-1029"})
    public void shouldReceiveChangesForTableWithoutPrimaryKey() throws Exception {
        TestHelper.execute("DROP TABLE IF EXISTS test_table;", "CREATE TABLE test_table (id SERIAL, text TEXT);", "ALTER TABLE test_table REPLICA IDENTITY FULL");
        startConnector(Function.identity(), false);
        this.consumer = testConsumer(1, new String[0]);
        assertInsert("INSERT INTO test_table (text) VALUES ('a');", Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "a")));
        this.consumer.expects(1);
        executeAndWait("UPDATE test_table set text='b' WHERE id=1");
        SourceRecord remove = this.consumer.remove();
        VerifyRecord.isValidUpdate(remove);
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "a")), remove, "before");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "b")), remove, "after");
        this.consumer.expects(2);
        executeAndWait("DELETE FROM test_table WHERE id=1");
        SourceRecord remove2 = this.consumer.remove();
        VerifyRecord.isValidDelete(remove2);
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "b")), remove2, "before");
        assertRecordSchemaAndValues(null, remove2, "after");
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON, reason = "WAL2JSON specific: Pass 'add-tables' stream parameter and verify it acts as an include list")
    @FixFor({"DBZ-1130"})
    public void testPassingStreamParams() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.STREAM_PARAMS, "add-tables=s1.should_stream");
        });
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("CREATE SCHEMA s1;CREATE TABLE s1.should_stream (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.should_not_stream (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.should_not_stream (aa) VALUES (456);INSERT INTO s1.should_stream (aa) VALUES (123);");
        assertRecordInserted("s1.should_stream", "pk", 1);
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON, reason = "WAL2JSON specific: Pass multiple stream parameters and values verifying they work")
    @FixFor({"DBZ-1130"})
    public void testPassingStreamMultipleParams() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.STREAM_PARAMS, "add-tables=s1.should_stream,s2.*;filter-tables=s2.should_not_stream");
        });
        this.consumer = testConsumer(2, new String[0]);
        executeAndWait("CREATE SCHEMA s1;CREATE SCHEMA s2;CREATE TABLE s1.should_stream (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.should_stream (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.should_not_stream (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.should_not_stream (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.should_not_stream (aa) VALUES (456);INSERT INTO s2.should_not_stream (aa) VALUES (111);INSERT INTO s1.should_stream (aa) VALUES (123);INSERT INTO s2.should_stream (aa) VALUES (999);");
        assertRecordInserted("s1.should_stream", "pk", 1);
        assertRecordInserted("s2.should_stream", "pk", 1);
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-1146"})
    public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTableFromSnapshot() throws Exception {
        testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST, true);
    }

    @Test
    @FixFor({"DBZ-1146"})
    public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTableFromStreaming() throws Exception {
        testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST, false);
    }

    @Test
    @FixFor({"DBZ-1146"})
    public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTableFromSnapshotFullDiff() throws Exception {
        testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF, true);
    }

    @Test
    @FixFor({"DBZ-1146"})
    public void shouldReceiveChangesForReplicaIdentityFullTableWithToastedValueTableFromStreamingFullDiff() throws Exception {
        testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF, false);
    }

    @Test
    @FixFor({"DBZ-1181"})
    public void testEmptyChangesProducesHeartbeat() throws Exception {
        startConnector(builder -> {
            return builder.with(Heartbeat.HEARTBEAT_INTERVAL, "100");
        });
        waitForStreamingToStart();
        TestHelper.execute("DROP TABLE IF EXISTS test_table;CREATE TABLE test_table (id SERIAL, text TEXT);INSERT INTO test_table (text) VALUES ('mydata');", new String[0]);
        Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS).until(() -> {
            SourceRecord consumeRecord = consumeRecord();
            return Boolean.valueOf(consumeRecord != null && Envelope.isEnvelopeSchema(consumeRecord.valueSchema()));
        });
        HashSet hashSet = new HashSet();
        Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS).until(() -> {
            SourceRecord consumeRecord = consumeRecord();
            if (consumeRecord == null) {
                return false;
            }
            Assertions.assertThat(consumeRecord.valueSchema().name()).endsWith(".Heartbeat");
            hashSet.add((Long) consumeRecord.sourceOffset().get("lsn"));
            return true;
        });
        TestHelper.execute("CREATE SCHEMA s1;", new String[0]);
        Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS).until(() -> {
            SourceRecord consumeRecord = consumeRecord();
            Assertions.assertThat(consumeRecord.valueSchema().name()).endsWith(".Heartbeat");
            hashSet.add((Long) consumeRecord.sourceOffset().get("lsn"));
            return Boolean.valueOf(hashSet.size() == 2);
        });
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-1082"})
    public void shouldHaveNoXminWhenNotEnabled() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.XMIN_FETCH_INTERVAL, "0");
        });
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;", new String[0]);
        executeAndWait("INSERT INTO test_table (text) VALUES ('no_xmin');");
        SourceRecord assertRecordInserted = assertRecordInserted("public.test_table", "pk", 2);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "test_table");
        Assertions.assertThat(((Struct) assertRecordInserted.value()).getStruct("source").getInt64("xmin")).isNull();
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-1082"})
    public void shouldHaveXminWhenEnabled() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.XMIN_FETCH_INTERVAL, "10");
        });
        TestHelper.execute("ALTER TABLE test_table REPLICA IDENTITY DEFAULT;", new String[0]);
        executeAndWait("INSERT INTO test_table (text) VALUES ('with_xmin');");
        SourceRecord assertRecordInserted = assertRecordInserted("public.test_table", "pk", 2);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "test_table");
        Assertions.assertThat(((Struct) assertRecordInserted.value()).getStruct("source").getInt64("xmin")).isGreaterThan(0L);
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    public void shouldProcessLargerTx() throws Exception {
        Testing.Print.disable();
        startConnector();
        waitForStreamingToStart();
        String str = TestHelper.topicName("public.test_table");
        Stopwatch reusable = Stopwatch.reusable();
        this.consumer = testConsumer(1000, new String[0]);
        reusable.start();
        executeAndWait((String) IntStream.rangeClosed(2, 1001).boxed().map(num -> {
            return "INSERT INTO test_table (text) VALUES ('insert" + num + "')";
        }).collect(Collectors.joining(";")));
        reusable.stop();
        long millis = reusable.durations().statistics().getTotal().toMillis();
        this.logger.info("Single tx duration = {} ms", Long.valueOf(millis));
        for (int i = 0; i < 1000; i++) {
            SourceRecord remove = this.consumer.remove();
            TestCase.assertEquals(str, remove.topic());
            VerifyRecord.isValidInsert(remove, "pk", i + 2);
        }
        this.consumer.expects(1000);
        IntStream.rangeClosed(2, 1001).forEach(i2 -> {
            TestHelper.execute("INSERT INTO test_table (text) VALUES ('insert" + i2 + "')", new String[0]);
        });
        reusable.start();
        this.consumer.await(3 * millis, TimeUnit.MILLISECONDS);
        reusable.stop();
        for (int i3 = 0; i3 < 1000; i3++) {
            SourceRecord remove2 = this.consumer.remove();
            TestCase.assertEquals(str, remove2.topic());
            VerifyRecord.isValidInsert(remove2, "pk", i3 + 1002);
        }
        this.logger.info("Many tx duration = {} ms", Long.valueOf(reusable.durations().statistics().getTotal().toMillis()));
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.WAL2JSON, reason = "wal2json cannot resume transaction in the middle of processing")
    @FixFor({"DBZ-1824"})
    public void stopInTheMiddleOfTxAndResume() throws Exception {
        Testing.Print.enable();
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false);
        }, true, sourceRecord -> {
            return "test_server.public.test_table.Envelope".equals(sourceRecord.valueSchema().name()) && ((Struct) sourceRecord.value()).getStruct("after").getInt32("pk").intValue() == 20;
        });
        waitForStreamingToStart();
        String str = TestHelper.topicName("public.test_table");
        this.consumer = testConsumer(18, new String[0]);
        executeAndWait((String) IntStream.rangeClosed(2, 51).boxed().map(num -> {
            return "INSERT INTO test_table (text) VALUES ('insert" + num + "')";
        }).collect(Collectors.joining(";")));
        for (int i = 0; i < 18; i++) {
            SourceRecord remove = this.consumer.remove();
            TestCase.assertEquals(str, remove.topic());
            VerifyRecord.isValidInsert(remove, "pk", i + 2);
        }
        stopConnector();
        startConnector(Function.identity(), false);
        this.consumer.expects(30);
        this.consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        for (int i2 = 0; i2 < 30; i2++) {
            SourceRecord remove2 = this.consumer.remove();
            TestCase.assertEquals(str, remove2.topic());
            VerifyRecord.isValidInsert(remove2, "pk", 20 + i2);
        }
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.WAL2JSON, reason = "wal2json cannot resume transaction in the middle of processing")
    @FixFor({"DBZ-2397"})
    public void restartConnectorInTheMiddleOfUncommittedTx() throws Exception {
        Testing.Print.enable();
        PostgresConnection create = TestHelper.create();
        create.setAutoCommit(false);
        PostgresConnection create2 = TestHelper.create();
        create2.setAutoCommit(true);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false);
        }, true);
        waitForStreamingToStart();
        create.executeWithoutCommitting(new String[]{"INSERT INTO test_table (text) VALUES ('tx-1-1')"});
        create2.execute(new String[]{"INSERT INTO test_table (text) VALUES ('tx-2-1')"});
        this.consumer = testConsumer(1, new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        Assertions.assertThat(((Struct) this.consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-2-1");
        stopConnector();
        startConnector(Function.identity(), false);
        waitForStreamingToStart();
        create.executeWithoutCommitting(new String[]{"INSERT INTO test_table (text) VALUES ('tx-1-2')"});
        create2.execute(new String[]{"INSERT INTO test_table (text) VALUES ('tx-2-2')"});
        create.executeWithoutCommitting(new String[]{"INSERT INTO test_table (text) VALUES ('tx-1-3')"});
        create2.execute(new String[]{"INSERT INTO test_table (text) VALUES ('tx-2-3')"});
        create.commit();
        this.consumer = testConsumer(5, new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        Assertions.assertThat(((Struct) this.consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-2-2");
        Assertions.assertThat(((Struct) this.consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-2-3");
        Assertions.assertThat(((Struct) this.consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-1-1");
        Assertions.assertThat(((Struct) this.consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-1-2");
        Assertions.assertThat(((Struct) this.consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("tx-1-3");
    }

    @Test
    @FixFor({"DBZ-1730"})
    public void shouldStartConsumingFromSlotLocation() throws Exception {
        Testing.Print.enable();
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false).with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class);
        }, true);
        waitForStreamingToStart();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO test_table (text) VALUES ('insert2')");
        this.consumer.remove();
        stopConnector();
        TestHelper.execute("INSERT INTO test_table (text) VALUES ('insert3');", "INSERT INTO test_table (text) VALUES ('insert4')");
        startConnector(builder2 -> {
            return builder2.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with(EmbeddedEngine.OFFSET_STORAGE, MemoryOffsetBackingStore.class);
        }, false);
        boolean endsWith = TestHelper.decoderPlugin().name().toLowerCase().endsWith("streaming");
        this.consumer.expects(endsWith ? 2 : 3);
        this.consumer.await(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS);
        if (!endsWith) {
            Assertions.assertThat(((Struct) this.consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("insert2");
        }
        Assertions.assertThat(((Struct) this.consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("insert3");
        Assertions.assertThat(((Struct) this.consumer.remove().value()).getStruct("after").getString("text")).isEqualTo("insert4");
        stopConnector();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON, reason = "wal2json cannot resume transaction in the middle of processing")
    @FixFor({"DBZ-1824"})
    public void stopInTheMiddleOfTxAndRestart() throws Exception {
        Testing.Print.enable();
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false);
        }, true, sourceRecord -> {
            return "test_server.public.test_table.Envelope".equals(sourceRecord.valueSchema().name()) && ((Struct) sourceRecord.value()).getStruct("after").getInt32("pk").intValue() == 20;
        });
        waitForStreamingToStart();
        String str = TestHelper.topicName("public.test_table");
        this.consumer = testConsumer(18, new String[0]);
        executeAndWait((String) IntStream.rangeClosed(2, 51).boxed().map(num -> {
            return "INSERT INTO test_table (text) VALUES ('insert" + num + "')";
        }).collect(Collectors.joining(";")));
        for (int i = 0; i < 18; i++) {
            SourceRecord remove = this.consumer.remove();
            TestCase.assertEquals(str, remove.topic());
            VerifyRecord.isValidInsert(remove, "pk", i + 2);
        }
        stopConnector();
        startConnector(Function.identity(), false);
        this.consumer.expects(50);
        this.consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
        for (int i2 = 0; i2 < 50; i2++) {
            SourceRecord remove2 = this.consumer.remove();
            TestCase.assertEquals(str, remove2.topic());
            VerifyRecord.isValidInsert(remove2, "pk", i2 + 2);
        }
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Tests specifically that pgoutput handles TRUNCATE messages")
    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 11, reason = "TRUNCATE events only supported in PG11+ PGOUTPUT Plugin")
    public void shouldProcessTruncateMessages() throws Exception {
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.TRUNCATE_HANDLING_MODE, PostgresConnectorConfig.TruncateHandlingMode.INCLUDE);
        });
        waitForStreamingToStart();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO test_table (text) values ('TRUNCATE TEST');");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidInsert(remove, "pk", 2);
        this.consumer.expects(1);
        TestHelper.execute("TRUNCATE TABLE public.test_table RESTART IDENTITY CASCADE;", new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        Assert.assertFalse(this.consumer.isEmpty());
        SourceRecord remove2 = this.consumer.remove();
        Assert.assertNotNull(remove2);
        VerifyRecord.isValidTruncate(remove2);
        TestCase.assertTrue(this.consumer.isEmpty());
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Tests specifically that pgoutput handled TRUNCATE these messages")
    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 11, reason = "TRUNCATE events only supported in PG11+ PGOUTPUT Plugin")
    public void shouldProcessTruncateMessagesForMultipleTableTruncateStatement() throws Exception {
        TestHelper.execute("CREATE TABLE test_table_2 (pk SERIAL, text TEXT, PRIMARY KEY(pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.TRUNCATE_HANDLING_MODE, PostgresConnectorConfig.TruncateHandlingMode.INCLUDE);
        });
        waitForStreamingToStart();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO test_table (text) values ('TRUNCATE TEST');");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table"), remove.topic());
        VerifyRecord.isValidInsert(remove, "pk", 2);
        executeAndWait("INSERT INTO test_table_2 (text) values ('TRUNCATE TEST 2');");
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(TestHelper.topicName("public.test_table_2"), remove2.topic());
        VerifyRecord.isValidInsert(remove2, "pk", 1);
        this.consumer.expects(2);
        TestHelper.execute("TRUNCATE TABLE public.test_table, public.test_table_2;", new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        Assert.assertFalse(this.consumer.isEmpty());
        SourceRecord remove3 = this.consumer.remove();
        Assert.assertNotNull(remove3);
        VerifyRecord.isValidTruncate(remove3);
        SourceRecord remove4 = this.consumer.remove();
        Assert.assertNotNull(remove4);
        VerifyRecord.isValidTruncate(remove4);
        TestCase.assertTrue(this.consumer.isEmpty());
        TestCase.assertEquals(remove3.sourceOffset().get("lsn_commit"), remove4.sourceOffset().get("lsn_commit"));
        TestCase.assertEquals(remove3.sourceOffset().get("lsn"), remove4.sourceOffset().get("lsn"));
        TestCase.assertEquals(remove3.sourceOffset().get("txId"), remove4.sourceOffset().get("txId"));
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO test_table (text) values ('TRUNCATE TEST');");
    }

    @Test
    @FixFor({"DBZ-1413"})
    public void shouldStreamChangesForDataTypeAlias() throws Exception {
        TestHelper.execute("CREATE DOMAIN money2 AS money DEFAULT 0.0;", new String[0]);
        TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, data VARCHAR(50), salary money, salary2 money2, PRIMARY KEY(pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table");
        }, false);
        waitForStreamingToStart();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO alias_table (data, salary, salary2) values ('hello', 7.25, 8.25);");
        SourceRecord assertRecordInserted = assertRecordInserted("public.alias_table", "pk", 1);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "alias_table");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("data", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "hello"), new AbstractRecordsProducerTest.SchemaAndValueField("salary", Decimal.builder(2).optional().build(), new BigDecimal(7.25d)), new AbstractRecordsProducerTest.SchemaAndValueField("salary2", Decimal.builder(2).optional().build(), new BigDecimal(8.25d))), assertRecordInserted, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-1413"})
    public void shouldStreamChangesForDomainAliasAlterTable() throws Exception {
        TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, data VARCHAR(50), salary money, PRIMARY KEY(pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table").with("column.propagate.source.type", "public.alias_table.salary3");
        }, false);
        waitForStreamingToStart();
        TestHelper.execute("CREATE DOMAIN money2 AS money DEFAULT 0.0;", new String[0]);
        TestHelper.execute("CREATE DOMAIN money3 AS numeric(8,3) DEFAULT 0.0;", new String[0]);
        TestHelper.execute("ALTER TABLE alias_table ADD COLUMN salary2 money2 NOT NULL;", new String[0]);
        TestHelper.execute("ALTER TABLE alias_table ADD COLUMN salary3 money3 NOT NULL;", new String[0]);
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO alias_table (data, salary, salary2, salary3) values ('hello', 7.25, 8.25, 123.456);");
        SourceRecord assertRecordInserted = assertRecordInserted("public.alias_table", "pk", 1);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "alias_table");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("data", SchemaBuilder.OPTIONAL_STRING_SCHEMA, "hello"), new AbstractRecordsProducerTest.SchemaAndValueField("salary", Decimal.builder(2).optional().build(), new BigDecimal(7.25d)), new AbstractRecordsProducerTest.SchemaAndValueField("salary2", Decimal.builder(2).build(), new BigDecimal(8.25d)), new AbstractRecordsProducerTest.SchemaAndValueField("salary3", SchemaBuilder.float64().parameter("__debezium.source.column.type", "MONEY3").parameter("__debezium.source.column.length", "8").parameter("__debezium.source.column.scale", "3").build(), Double.valueOf(123.456d))), assertRecordInserted, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-1413"})
    public void shouldStreamDomainAliasWithProperModifiers() throws Exception {
        TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, PRIMARY KEY(pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table");
        }, false);
        waitForStreamingToStart();
        TestHelper.execute("CREATE DOMAIN varbit2 AS varbit(3);", new String[0]);
        TestHelper.execute("ALTER TABLE public.alias_table ADD COLUMN value varbit2 NOT NULL;", new String[0]);
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO public.alias_table (value) VALUES (B'101');");
        SourceRecord assertRecordInserted = assertRecordInserted("public.alias_table", "pk", 1);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "alias_table");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("value", Bits.builder(3).build(), new byte[]{5})), assertRecordInserted, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-1413"})
    public void shouldStreamValuesForDomainTypeOfDomainType() throws Exception {
        TestHelper.execute("CREATE DOMAIN numeric82 as numeric(8,2);", new String[0]);
        TestHelper.execute("CREATE DOMAIN numericex as numeric82;", new String[0]);
        TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, value numericex, PRIMARY KEY (pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table").with("column.propagate.source.type", "public.alias_table.value");
        }, false);
        waitForStreamingToStart();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO alias_table (value) values (123.45);");
        SourceRecord assertRecordInserted = assertRecordInserted("public.alias_table", "pk", 1);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "alias_table");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("value", SpecialValueDecimal.builder(JdbcValueConverters.DecimalMode.DOUBLE, 8, 2).optional().parameter("__debezium.source.column.type", "NUMERICEX").parameter("__debezium.source.column.length", "8").parameter("__debezium.source.column.scale", "2").build(), Double.valueOf(123.45d))), assertRecordInserted, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-1413"})
    public void shouldStreamValuesForAliasLikeBaseTypes() throws Exception {
        TestHelper.execute("CREATE TABLE alias_table (pk SERIAL, PRIMARY KEY (pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.DOUBLE).with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.alias_table");
        }, false);
        waitForStreamingToStart();
        TestHelper.execute("CREATE DOMAIN bit2 AS BIT(3);", new String[0]);
        TestHelper.execute("CREATE DOMAIN smallint2 AS smallint;", new String[0]);
        TestHelper.execute("CREATE DOMAIN integer2 as integer;", new String[0]);
        TestHelper.execute("CREATE DOMAIN bigint2 as bigint;", new String[0]);
        TestHelper.execute("CREATE DOMAIN real2 as real;", new String[0]);
        TestHelper.execute("CREATE DOMAIN bool2 AS BOOL DEFAULT false;", new String[0]);
        TestHelper.execute("CREATE DOMAIN float82 as float8;", new String[0]);
        TestHelper.execute("CREATE DOMAIN numeric2 as numeric(6,2);", new String[0]);
        TestHelper.execute("CREATE DOMAIN string2 AS varchar(25) DEFAULT NULL;", new String[0]);
        TestHelper.execute("CREATE DOMAIN date2 AS date;", new String[0]);
        TestHelper.execute("CREATE DOMAIN time2 as time;", new String[0]);
        TestHelper.execute("CREATE DOMAIN timetz2 as timetz;", new String[0]);
        TestHelper.execute("CREATE DOMAIN timestamp2 as timestamp;", new String[0]);
        TestHelper.execute("CREATE DOMAIN timestamptz2 AS timestamptz;", new String[0]);
        TestHelper.execute("CREATE DOMAIN timewotz2 as time without time zone;", new String[0]);
        TestHelper.execute("CREATE DOMAIN box2 as box;", new String[0]);
        TestHelper.execute("CREATE DOMAIN circle2 as circle;", new String[0]);
        TestHelper.execute("CREATE DOMAIN interval2 as interval;", new String[0]);
        TestHelper.execute("CREATE DOMAIN line2 as line;", new String[0]);
        TestHelper.execute("CREATE DOMAIN lseg2 as lseg;", new String[0]);
        TestHelper.execute("CREATE DOMAIN path2 as path;", new String[0]);
        TestHelper.execute("CREATE DOMAIN point2 as point;", new String[0]);
        TestHelper.execute("CREATE DOMAIN polygon2 as polygon;", new String[0]);
        TestHelper.execute("CREATE DOMAIN char2 as char;", new String[0]);
        TestHelper.execute("CREATE DOMAIN text2 as text;", new String[0]);
        TestHelper.execute("CREATE DOMAIN json2 as json;", new String[0]);
        TestHelper.execute("CREATE DOMAIN xml2 as xml;", new String[0]);
        TestHelper.execute("CREATE DOMAIN uuid2 as uuid;", new String[0]);
        TestHelper.execute("CREATE DOMAIN varbit2 as varbit(3);", new String[0]);
        TestHelper.execute("CREATE DOMAIN inet2 as inet;", new String[0]);
        TestHelper.execute("CREATE DOMAIN cidr2 as cidr;", new String[0]);
        TestHelper.execute("CREATE DOMAIN macaddr2 as macaddr;", new String[0]);
        TestHelper.execute("ALTER TABLE alias_table ADD COLUMN bit_base bit(3) NOT NULL, ADD COLUMN bit_alias bit2 NOT NULL, ADD COLUMN smallint_base smallint NOT NULL, ADD COLUMN smallint_alias smallint2 NOT NULL, ADD COLUMN integer_base integer NOT NULL, ADD COLUMN integer_alias integer2 NOT NULL, ADD COLUMN bigint_base bigint NOT NULL, ADD COLUMN bigint_alias bigint2 NOT NULL, ADD COLUMN real_base real NOT NULL, ADD COLUMN real_alias real2 NOT NULL, ADD COLUMN float8_base float8 NOT NULL, ADD COLUMN float8_alias float82 NOT NULL, ADD COLUMN numeric_base numeric(6,2) NOT NULL, ADD COLUMN numeric_alias numeric2 NOT NULL, ADD COLUMN bool_base bool NOT NULL, ADD COLUMN bool_alias bool2 NOT NULL, ADD COLUMN string_base varchar(25) NOT NULL, ADD COLUMN string_alias string2 NOT NULL, ADD COLUMN date_base date NOT NULL, ADD COLUMN date_alias date2 NOT NULL, ADD COLUMN time_base time NOT NULL, ADD COLUMN time_alias time2 NOT NULL, ADD COLUMN timetz_base timetz NOT NULL, ADD COLUMN timetz_alias timetz2 NOT NULL, ADD COLUMN timestamp_base timestamp NOT NULL, ADD COLUMN timestamp_alias timestamp2 NOT NULL, ADD COLUMN timestamptz_base timestamptz NOT NULL, ADD COLUMN timestamptz_alias timestamptz2 NOT NULL, ADD COLUMN timewottz_base time without time zone NOT NULL, ADD COLUMN timewottz_alias timewotz2 NOT NULL, ADD COLUMN box_base box NOT NULL, ADD COLUMN box_alias box2 NOT NULL, ADD COLUMN circle_base circle NOT NULL, ADD COLUMN circle_alias circle2 NOT NULL, ADD COLUMN interval_base interval NOT NULL, ADD COLUMN interval_alias interval2 NOT NULL, ADD COLUMN line_base line NOT NULL, ADD COLUMN line_alias line2 NOT NULL, ADD COLUMN lseg_base lseg NOT NULL, ADD COLUMN lseg_alias lseg2 NOT NULL, ADD COLUMN path_base path NOT NULL, ADD COLUMN path_alias path2 NOT NULL, ADD COLUMN point_base point NOT NULL, ADD COLUMN point_alias point2 NOT NULL, ADD COLUMN polygon_base polygon NOT NULL, ADD COLUMN polygon_alias polygon2 NOT NULL, ADD COLUMN char_base char NOT NULL, ADD COLUMN char_alias char2 NOT NULL, ADD COLUMN text_base text NOT NULL, ADD COLUMN text_alias text2 NOT NULL, ADD COLUMN json_base json NOT NULL, ADD COLUMN json_alias json2 NOT NULL, ADD COLUMN xml_base xml NOT NULL, ADD COLUMN xml_alias xml2 NOT NULL, ADD COLUMN uuid_base UUID NOT NULL, ADD COLUMN uuid_alias uuid2 NOT NULL, ADD COLUMN varbit_base varbit(3) NOT NULL, ADD COLUMN varbit_alias varbit2 NOT NULL,ADD COLUMN inet_base inet NOT NULL, ADD COLUMN inet_alias inet2 NOT NULL, ADD COLUMN cidr_base cidr NOT NULL, ADD COLUMN cidr_alias cidr2 NOT NULL, ADD COLUMN macaddr_base macaddr NOT NULL, ADD COLUMN macaddr_alias macaddr2 NOT NULL", new String[0]);
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO alias_table (bit_base, bit_alias, smallint_base, smallint_alias, integer_base, integer_alias, bigint_base, bigint_alias, real_base, real_alias, float8_base, float8_alias, numeric_base, numeric_alias, bool_base, bool_alias, string_base, string_alias, date_base, date_alias, time_base, time_alias, timetz_base, timetz_alias, timestamp_base, timestamp_alias, timestamptz_base, timestamptz_alias, timewottz_base, timewottz_alias, box_base, box_alias, circle_base, circle_alias, interval_base, interval_alias, line_base, line_alias, lseg_base, lseg_alias, path_base, path_alias, point_base, point_alias, polygon_base, polygon_alias, char_base, char_alias, text_base, text_alias, json_base, json_alias, xml_base, xml_alias, uuid_base, uuid_alias, varbit_base, varbit_alias, inet_base, inet_alias, cidr_base, cidr_alias, macaddr_base, macaddr_alias ) VALUES (B'101', B'101', 1, 1, 1, 1, 1000, 1000, 3.14, 3.14, 3.14, 3.14, 1234.12, 1234.12, true, true, 'hello', 'hello', '2019-10-02', '2019-10-02', '01:02:03', '01:02:03', '01:02:03.123789Z', '01:02:03.123789Z', '2019-10-02T01:02:03.123456', '2019-10-02T01:02:03.123456', '2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, '2019-10-02T13:51:30.123456+02:00'::TIMESTAMPTZ, '01:02:03', '01:02:03', '(0,0),(1,1)', '(0,0),(1,1)', '10,4,10', '10,4,10', '1 year 2 months 3 days 4 hours 5 minutes 6 seconds', '1 year 2 months 3 days 4 hours 5 minutes 6 seconds', '(0,0),(0,1)', '(0,0),(0,1)', '((0,0),(0,1))', '((0,0),(0,1))', '((0,0),(0,1),(0,2))', '((0,0),(0,1),(0,2))', '(1,1)', '(1,1)', '((0,0),(0,1),(1,0),(0,0))', '((0,0),(0,1),(1,0),(0,0))', 'a', 'a', 'Hello World', 'Hello World', '{\"key\": \"value\"}', '{\"key\": \"value\"}', XML('<foo>Hello</foo>'), XML('<foo>Hello</foo>'), '40e6215d-b5c6-4896-987c-f30f3678f608', '40e6215d-b5c6-4896-987c-f30f3678f608', B'101', B'101', '192.168.0.1', '192.168.0.1', '192.168/24', '192.168/24', '08:00:2b:01:02:03', '08:00:2b:01:02:03' );");
        SourceRecord assertRecordInserted = assertRecordInserted("public.alias_table", "pk", 1);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "alias_table");
        assertRecordSchemaAndValues(schemasAndValuesForDomainAliasTypes(true), assertRecordInserted, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-920"})
    public void shouldStreamEnumAsKnownType() throws Exception {
        TestHelper.execute("CREATE TABLE enum_table (pk SERIAL, PRIMARY KEY (pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with("column.propagate.source.type", "public.enum_table.value").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.enum_table");
        }, false);
        waitForStreamingToStart();
        TestHelper.execute("CREATE TYPE test_type AS ENUM ('V1','V2');", new String[0]);
        TestHelper.execute("ALTER TABLE enum_table ADD COLUMN value test_type NOT NULL", new String[0]);
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO enum_table (value) VALUES ('V1');");
        SourceRecord assertRecordInserted = assertRecordInserted("public.enum_table", "pk", 1);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "enum_table");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", Schema.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("value", Enum.builder("V1,V2").parameter("__debezium.source.column.type", "TEST_TYPE").parameter("__debezium.source.column.length", String.valueOf(Integer.MAX_VALUE)).parameter("__debezium.source.column.scale", "0").build(), "V1")), assertRecordInserted, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    public void shouldStreamEnumArrayAsKnownType() throws Exception {
        TestHelper.execute("CREATE TABLE enum_array_table (pk SERIAL, PRIMARY KEY (pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with("column.propagate.source.type", "public.enum_array_table.value").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.enum_array_table");
        }, false);
        waitForStreamingToStart();
        TestHelper.execute("CREATE TYPE test_type AS ENUM ('V1','V2');", new String[0]);
        TestHelper.execute("ALTER TABLE enum_array_table ADD COLUMN value test_type[] NOT NULL;", new String[0]);
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO enum_array_table (value) VALUES ('{V1, V2}');");
        SourceRecord assertRecordInserted = assertRecordInserted("public.enum_array_table", "pk", 1);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "enum_array_table");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", Schema.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("value", SchemaBuilder.array(Enum.builder("V1,V2")).parameter("__debezium.source.column.type", "_TEST_TYPE").parameter("__debezium.source.column.length", String.valueOf(Integer.MAX_VALUE)).parameter("__debezium.source.column.scale", "0").build(), Arrays.asList("V1", "V2"))), assertRecordInserted, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
        executeAndWait("UPDATE enum_array_table set value = '{V1}';");
        SourceRecord remove = this.consumer.remove();
        assertSourceInfo(remove, "postgres", "public", "enum_array_table");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", Schema.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("value", SchemaBuilder.array(Enum.builder("V1,V2")).parameter("__debezium.source.column.type", "_TEST_TYPE").parameter("__debezium.source.column.length", String.valueOf(Integer.MAX_VALUE)).parameter("__debezium.source.column.scale", "0").build(), Arrays.asList("V1"))), remove, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
        executeAndWait("DELETE FROM enum_array_table;");
        VerifyRecord.isValidDelete(this.consumer.remove(), "pk", 1);
        assertSourceInfo(remove, "postgres", "public", "enum_array_table");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-1969"})
    public void shouldStreamTimeArrayTypesAsKnownTypes() throws Exception {
        TestHelper.execute("CREATE TABLE time_array_table (pk SERIAL, timea time[] NOT NULL, timetza timetz[] NOT NULL, timestampa timestamp[] NOT NULL, timestamptza timestamptz[] NOT NULL, primary key(pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.time_array_table");
        }, false);
        waitForStreamingToStart();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO time_array_table (timea, timetza, timestampa, timestamptza) values ('{00:01:02,01:02:03}', '{13:51:02+0200,14:51:03+0200}', '{2020-04-01 00:01:02,2020-04-01 01:02:03}', '{2020-04-01 13:51:02+02,2020-04-01 14:51:03+02}')");
        SourceRecord assertRecordInserted = assertRecordInserted("public.time_array_table", "pk", 1);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "time_array_table");
        assertRecordSchemaAndValues(schemaAndValuesForTimeArrayTypes(), assertRecordInserted, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
        executeAndWait("UPDATE time_array_table SET timea = '{00:01:02,02:03:04}', timetza = '{00:01:02-0400,01:03:04-0400}', timestampa = '{2020-04-01 00:01:02,2020-04-25 03:04:05}', timestamptza = '{2020-04-01 00:01:02-04,2020-04-25 03:04:05-04}'");
        SourceRecord remove = this.consumer.remove();
        assertSourceInfo(remove, "postgres", "public", "time_array_table");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("timea", SchemaBuilder.array(MicroTime.builder().optional().build()).build(), Arrays.asList(Long.valueOf(LocalTime.parse("00:01:02").toNanoOfDay() / 1000), Long.valueOf(LocalTime.parse("02:03:04").toNanoOfDay() / 1000))), new AbstractRecordsProducerTest.SchemaAndValueField("timetza", SchemaBuilder.array(ZonedTime.builder().optional().build()).build(), Arrays.asList("04:01:02Z", "05:03:04Z")), new AbstractRecordsProducerTest.SchemaAndValueField("timestampa", SchemaBuilder.array(MicroTimestamp.builder().optional().build()).build(), Arrays.asList(Long.valueOf(OffsetDateTime.of(2020, 4, 1, 0, 1, 2, 0, ZoneOffset.UTC).toInstant().toEpochMilli() * 1000), Long.valueOf(OffsetDateTime.of(2020, 4, 25, 3, 4, 5, 0, ZoneOffset.UTC).toInstant().toEpochMilli() * 1000))), new AbstractRecordsProducerTest.SchemaAndValueField("timestamptza", SchemaBuilder.array(ZonedTimestamp.builder().optional().build()).build(), Arrays.asList("2020-04-01T04:01:02Z", "2020-04-25T07:04:05Z"))), remove, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
        executeAndWait("DELETE FROM time_array_table;");
        SourceRecord remove2 = this.consumer.remove();
        VerifyRecord.isValidDelete(remove2, "pk", 1);
        assertSourceInfo(remove2, "postgres", "public", "time_array_table");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    @Test
    @FixFor({"DBZ-1680"})
    public void shouldStreamEnumsWhenIncludeUnknownDataTypesDisabled() throws Exception {
        TestHelper.execute("CREATE TYPE test_type AS ENUM ('V1','V2');", new String[0]);
        TestHelper.execute("CREATE TABLE enum_table (pk SERIAL, data varchar(25) NOT NULL, value test_type NOT NULL DEFAULT 'V1', PRIMARY KEY (pk));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, false).with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with("column.propagate.source.type", "public.enum_table.value").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.enum_table");
        }, false);
        waitForStreamingToStart();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO enum_table (data) VALUES ('hello');");
        SourceRecord assertRecordInserted = assertRecordInserted("public.enum_table", "pk", 1);
        assertSourceInfo(assertRecordInserted, "postgres", "public", "enum_table");
        assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("pk", Schema.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("data", Schema.STRING_SCHEMA, "hello"), new AbstractRecordsProducerTest.SchemaAndValueField("value", Enum.builder("V1,V2").parameter("__debezium.source.column.type", "TEST_TYPE").parameter("__debezium.source.column.length", String.valueOf(Integer.MAX_VALUE)).parameter("__debezium.source.column.scale", "0").build(), "V1")), assertRecordInserted, "after");
        Assertions.assertThat(this.consumer.isEmpty()).isTrue();
    }

    private long asEpochMicros(String str) {
        return (LocalDateTime.parse(str).atOffset(ZoneOffset.UTC).toInstant().getEpochSecond() * 1000000) + (r0.getNano() / 1000);
    }

    private void testReceiveChangesForReplicaIdentityFullTableWithToastedValue(PostgresConnectorConfig.SchemaRefreshMode schemaRefreshMode, boolean z) throws Exception {
        if (z) {
            TestHelper.execute("DROP TABLE IF EXISTS test_table;", "CREATE TABLE test_table (id SERIAL, not_toast int, text TEXT);", "ALTER TABLE test_table REPLICA IDENTITY FULL");
        }
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.SCHEMA_REFRESH_MODE, schemaRefreshMode);
        }, false);
        this.consumer = testConsumer(1, new String[0]);
        String randomAlphanumeric = RandomStringUtils.randomAlphanumeric(10000);
        if (!z) {
            TestHelper.execute("DROP TABLE IF EXISTS test_table;", "CREATE TABLE test_table (id SERIAL, not_toast int, text TEXT);", "ALTER TABLE test_table REPLICA IDENTITY FULL");
        }
        assertInsert("INSERT INTO test_table (not_toast, text) VALUES (10,'" + randomAlphanumeric + "');", Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, randomAlphanumeric)));
        this.consumer.expects(1);
        executeAndWait("UPDATE test_table set not_toast = 20");
        SourceRecord remove = this.consumer.remove();
        if (DecoderDifferences.areToastedValuesPresentInSchema() || schemaRefreshMode == PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST) {
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, randomAlphanumeric)), remove, "before");
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, randomAlphanumeric)), remove, "after");
        } else {
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 10)), remove, "before");
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20)), remove, "after");
        }
        this.consumer.expects(2);
        executeAndWait("DELETE FROM test_table");
        SourceRecord remove2 = this.consumer.remove();
        SourceRecord remove3 = this.consumer.remove();
        Assertions.assertThat(remove3.value()).isNull();
        Assertions.assertThat(remove3.valueSchema()).isNull();
        if (DecoderDifferences.areToastedValuesPresentInSchema() || schemaRefreshMode == PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST) {
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, randomAlphanumeric)), remove2, "before");
        } else {
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 1), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 20)), remove2, "before");
        }
        this.consumer.expects(1);
        assertInsert("INSERT INTO test_table (not_toast, text) VALUES (100, null);", Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 100), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null)));
        this.consumer.expects(1);
        executeAndWait("UPDATE test_table set not_toast = 200 WHERE id=2");
        SourceRecord remove4 = this.consumer.remove();
        if (DecoderDifferences.areToastedValuesPresentInSchema() || schemaRefreshMode == PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST) {
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 100), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null)), remove4, "before");
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 200), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null)), remove4, "after");
        } else {
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 100)), remove4, "before");
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 200)), remove4, "after");
        }
        this.consumer.expects(2);
        executeAndWait("DELETE FROM test_table WHERE id=2");
        SourceRecord remove5 = this.consumer.remove();
        SourceRecord remove6 = this.consumer.remove();
        Assertions.assertThat(remove6.value()).isNull();
        Assertions.assertThat(remove6.valueSchema()).isNull();
        if (DecoderDifferences.areToastedValuesPresentInSchema() || schemaRefreshMode == PostgresConnectorConfig.SchemaRefreshMode.COLUMNS_DIFF_EXCLUDE_UNCHANGED_TOAST) {
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 200), new AbstractRecordsProducerTest.SchemaAndValueField("text", SchemaBuilder.OPTIONAL_STRING_SCHEMA, null)), remove5, "before");
        } else {
            assertRecordSchemaAndValues(Arrays.asList(new AbstractRecordsProducerTest.SchemaAndValueField("id", SchemaBuilder.INT32_SCHEMA, 2), new AbstractRecordsProducerTest.SchemaAndValueField("not_toast", SchemaBuilder.OPTIONAL_INT32_SCHEMA, 200)), remove5, "before");
        }
    }

    @Test
    @FixFor({"DBZ-1815"})
    public void testHeartbeatActionQueryExecuted() throws Exception {
        TestHelper.execute("DROP TABLE IF EXISTS test_table;CREATE TABLE test_table (id SERIAL, text TEXT);INSERT INTO test_table (text) VALUES ('mydata');", new String[0]);
        TestHelper.execute("DROP TABLE IF EXISTS test_heartbeat_table;CREATE TABLE test_heartbeat_table (text TEXT);", new String[0]);
        startConnector(builder -> {
            return builder.with(Heartbeat.HEARTBEAT_INTERVAL, "100").with(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY, "INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat');");
        });
        Awaitility.await().atMost(TestHelper.waitTimeForRecords() * 10, TimeUnit.SECONDS).until(() -> {
            SourceRecord consumeRecord = consumeRecord();
            return Boolean.valueOf(consumeRecord != null && Envelope.isEnvelopeSchema(consumeRecord.valueSchema()));
        });
        JdbcConnection.ResultSetMapper resultSetMapper = resultSet -> {
            resultSet.next();
            return Integer.valueOf(resultSet.getInt(1));
        };
        PostgresConnection create = TestHelper.create();
        Throwable th = null;
        try {
            try {
                int intValue = ((Integer) create.queryAndMap("SELECT COUNT(*) FROM test_heartbeat_table;", resultSetMapper)).intValue();
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                TestCase.assertTrue(intValue > 0);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    @FixFor({"DBZ-1916", "DBZ-1830"})
    public void shouldPropagateSourceTypeByDatatype() throws Exception {
        TestHelper.execute("DROP TABLE IF EXISTS test_table;", new String[0]);
        TestHelper.execute("CREATE TABLE test_table (id SERIAL, c1 INT, c2 INT, c3a NUMERIC(5,2), c3b VARCHAR(128), f1 float(10), f2 decimal(8,4), primary key (id));", new String[0]);
        startConnector(builder -> {
            return builder.with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with("datatype.propagate.source.type", ".+\\.NUMERIC,.+\\.VARCHAR,.+\\.FLOAT4");
        }, false);
        waitForStreamingToStart();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("INSERT INTO test_table (id,c1,c2,c3a,c3b,f1,f2) values (1, 123, 456, 789.01, 'test', 1.228, 234.56);");
        Field field = assertRecordInserted("public.test_table", "id", 1).valueSchema().field("before");
        Assertions.assertThat(field.schema().field("id").schema().parameters()).isNull();
        Assertions.assertThat(field.schema().field("c1").schema().parameters()).isNull();
        Assertions.assertThat(field.schema().field("c2").schema().parameters()).isNull();
        Assertions.assertThat(field.schema().field("c3a").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry("__debezium.source.column.type", "NUMERIC"), MapAssert.entry("__debezium.source.column.length", "5"), MapAssert.entry("__debezium.source.column.scale", "2")});
        Assertions.assertThat(field.schema().field("c3b").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry("__debezium.source.column.type", "VARCHAR"), MapAssert.entry("__debezium.source.column.length", "128")});
        Assertions.assertThat(field.schema().field("f2").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry("__debezium.source.column.type", "NUMERIC"), MapAssert.entry("__debezium.source.column.length", "8"), MapAssert.entry("__debezium.source.column.scale", "4")});
        Assertions.assertThat(field.schema().field("f1").schema().parameters()).includes(new MapAssert.Entry[]{MapAssert.entry("__debezium.source.column.type", "FLOAT4"), MapAssert.entry("__debezium.source.column.length", "8"), MapAssert.entry("__debezium.source.column.scale", "8")});
    }

    @Test
    @FixFor({"DBZ-3074"})
    public void shouldMaintainPrimaryKeyOrderOnSchemaChange() throws Exception {
        startConnector();
        this.consumer = testConsumer(1, new String[0]);
        executeAndWait("CREATE TABLE test_should_maintain_primary_key_order(b INTEGER, d INTEGER, c INTEGER, a INTEGER, val INTEGER, PRIMARY KEY (b, d, c, a));INSERT INTO test_should_maintain_primary_key_order VALUES (1, 2, 3, 4, 5);");
        SourceRecord remove = this.consumer.remove();
        TestCase.assertEquals(1, ((Struct) remove.value()).getStruct("after").getInt32("b").intValue());
        List fields = remove.keySchema().fields();
        String[] strArr = {"b", "d", "c", "a"};
        for (int i = 0; i < fields.size(); i++) {
            TestCase.assertEquals("Key field names should in order", strArr[i], ((Field) fields.get(i)).name());
        }
        this.consumer.expects(1);
        executeAndWait("ALTER TABLE test_should_maintain_primary_key_order ADD COLUMN val2 INTEGER;INSERT INTO test_should_maintain_primary_key_order VALUES (10, 11, 12, 13, 14, 15);");
        SourceRecord remove2 = this.consumer.remove();
        TestCase.assertEquals(10, ((Struct) remove2.value()).getStruct("after").getInt32("b").intValue());
        List fields2 = remove2.keySchema().fields();
        for (int i2 = 0; i2 < fields2.size(); i2++) {
            TestCase.assertEquals("Key field names should in order", strArr[i2], ((Field) fields2.get(i2)).name());
        }
    }

    private void assertHeartBeatRecordInserted() {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        assertHeartBeatRecord(this.consumer.remove());
    }

    private void assertHeartBeatRecord(SourceRecord sourceRecord) {
        TestCase.assertEquals("__debezium-heartbeat.test_server", sourceRecord.topic());
        Assertions.assertThat(((Struct) sourceRecord.key()).get("serverName")).isEqualTo("test_server");
        Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("ts_ms")).isLessThanOrEqualTo(Instant.now().toEpochMilli());
    }

    private Optional<SourceRecord> isHeartBeatRecordInserted() {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        SourceRecord remove = this.consumer.remove();
        if (!"__debezium-heartbeat.test_server".equals(remove.topic())) {
            return Optional.of(remove);
        }
        TestCase.assertEquals("__debezium-heartbeat.test_server", remove.topic());
        Assertions.assertThat(((Struct) remove.key()).get("serverName")).isEqualTo("test_server");
        Assertions.assertThat(((Struct) remove.value()).getInt64("ts_ms")).isLessThanOrEqualTo(Instant.now().toEpochMilli());
        return Optional.empty();
    }

    private void assertInsert(String str, List<AbstractRecordsProducerTest.SchemaAndValueField> list) {
        assertInsert(str, null, list);
    }

    private void assertInsert(String str, Integer num, List<AbstractRecordsProducerTest.SchemaAndValueField> list) {
        TableId tableIdFromInsertStmt = tableIdFromInsertStmt(str);
        String replaceAll = (tableIdFromInsertStmt.schema() + "." + tableIdFromInsertStmt.table()).replaceAll("[ \"]", "_");
        try {
            executeAndWait(str);
            SourceRecord assertRecordInserted = assertRecordInserted(replaceAll, num != null ? "pk" : null, num);
            assertRecordOffsetAndSnapshotSource(assertRecordInserted, false, false);
            assertSourceInfo(assertRecordInserted, "postgres", tableIdFromInsertStmt.schema(), tableIdFromInsertStmt.table());
            assertRecordSchemaAndValues(list, assertRecordInserted, "after");
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private SourceRecord assertRecordInserted(SourceRecord sourceRecord, String str, String str2, Integer num) throws InterruptedException {
        TestCase.assertEquals(TestHelper.topicName(str), sourceRecord.topic());
        if (num != null) {
            VerifyRecord.isValidInsert(sourceRecord, str2, num.intValue());
        } else {
            VerifyRecord.isValidInsert(sourceRecord);
        }
        return sourceRecord;
    }

    private SourceRecord assertRecordInserted(String str, String str2, Integer num) throws InterruptedException {
        Assert.assertFalse("records not generated", this.consumer.isEmpty());
        return assertRecordInserted(this.consumer.remove(), str, str2, num);
    }

    private void executeAndWait(String str) throws Exception {
        TestHelper.execute(str, new String[0]);
        this.consumer.await(TestHelper.waitTimeForRecords() * 30, TimeUnit.SECONDS);
    }

    private void executeAndWaitForNoRecords(String str) throws Exception {
        TestHelper.execute(str, new String[0]);
        this.consumer.await(5L, TimeUnit.SECONDS);
    }
}
