package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.junit.SkipTestDependingOnDecoderPluginNameRule;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIs;
import io.debezium.connector.postgresql.junit.SkipWhenDecoderPluginNameIsNot;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenDatabaseVersion;
import io.debezium.junit.SkipWhenKafkaVersion;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.util.Strings;
import io.debezium.util.Testing;
import java.lang.management.ManagementFactory;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import javax.management.InstanceNotFoundException;
import junit.framework.TestCase;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.source.SourceRecord;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.fest.assertions.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import org.postgresql.util.PSQLState;

/* loaded from: input_file:io/debezium/connector/postgresql/PostgresConnectorIT.class */
public class PostgresConnectorIT extends AbstractConnectorTest {
    private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);";
    private static final String CREATE_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);";
    private PostgresConnector connector;

    @Rule
    public final TestRule skipName = new SkipTestDependingOnDecoderPluginNameRule();

    @BeforeClass
    public static void beforeClass() throws SQLException {
        TestHelper.dropAllSchemas();
    }

    @Before
    public void before() {
        initializeConnectorTestFramework();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @Test
    public void shouldValidateConnectorConfigDef() {
        this.connector = new PostgresConnector();
        Assertions.assertThat(this.connector.config()).isNotNull();
        PostgresConnectorConfig.ALL_FIELDS.forEach(this::validateFieldDef);
    }

    @Test
    public void shouldNotStartWithInvalidConfiguration() throws Exception {
        Configuration build = Configuration.create().build();
        this.logger.info("Attempting to start the connector with an INVALID configuration, so MULTIPLE error messages & one exceptions will appear in the log");
        start(PostgresConnector.class, build, (z, str, th) -> {
            Assertions.assertThat(z).isFalse();
            Assertions.assertThat(th).isNotNull();
        });
        assertConnectorNotRunning();
    }

    @Test
    public void shouldValidateMinimalConfiguration() throws Exception {
        new PostgresConnector().validate(TestHelper.defaultConfig().build().asMap()).configValues().forEach(configValue -> {
            Assert.assertTrue("Unexpected error for: " + configValue.name(), configValue.errorMessages().isEmpty());
        });
    }

    @Test
    public void shouldNotStartWithInvalidSlotConfigAndUserRoles() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.dropPublication();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute("CREATE USER badboy WITH PASSWORD 'failing';", "GRANT ALL PRIVILEGES ON DATABASE postgres TO badboy;");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with(PostgresConnectorConfig.SLOT_NAME, "debezium").build());
        waitForStreamingRunning();
        List configValues = new PostgresConnector().validate(TestHelper.defaultConfig().with("name", "failingPGConnector").with("database." + JdbcConfiguration.USER, "badboy").with("database." + JdbcConfiguration.PASSWORD, "failing").with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with(PostgresConnectorConfig.SLOT_NAME, "debezium").build().asMap()).configValues();
        List singletonList = Collections.singletonList("database.user");
        configValues.forEach(configValue -> {
            if (singletonList.contains(configValue.name())) {
                return;
            }
            Assert.assertTrue("Unexpected error for \"" + configValue.name() + "\": " + configValue.errorMessages(), configValue.errorMessages().isEmpty());
        });
        stopConnector();
    }

    @Test
    public void shouldValidateConfiguration() throws Exception {
        Config validate = new PostgresConnector().validate(Configuration.create().build().asMap());
        assertConfigurationErrors(validate, PostgresConnectorConfig.HOSTNAME, 1);
        assertConfigurationErrors(validate, PostgresConnectorConfig.USER, 1);
        assertConfigurationErrors(validate, PostgresConnectorConfig.DATABASE_NAME, 1);
        assertConfigurationErrors(validate, PostgresConnectorConfig.SERVER_NAME, 1);
        validateField(validate, PostgresConnectorConfig.PLUGIN_NAME, PostgresConnectorConfig.LogicalDecoder.DECODERBUFS.getValue());
        validateField(validate, PostgresConnectorConfig.SLOT_NAME, "debezium");
        validateField(validate, PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
        validateField(validate, PostgresConnectorConfig.PORT, 5432);
        validateField(validate, PostgresConnectorConfig.MAX_QUEUE_SIZE, 8192);
        validateField(validate, PostgresConnectorConfig.MAX_BATCH_SIZE, 2048);
        validateField(validate, PostgresConnectorConfig.SNAPSHOT_FETCH_SIZE, null);
        validateField(validate, PostgresConnectorConfig.POLL_INTERVAL_MS, 500L);
        validateField(validate, PostgresConnectorConfig.SSL_MODE, PostgresConnectorConfig.SecureConnectionMode.DISABLED);
        validateField(validate, PostgresConnectorConfig.SSL_CLIENT_CERT, null);
        validateField(validate, PostgresConnectorConfig.SSL_CLIENT_KEY, null);
        validateField(validate, PostgresConnectorConfig.SSL_CLIENT_KEY_PASSWORD, null);
        validateField(validate, PostgresConnectorConfig.SSL_ROOT_CERT, null);
        validateField(validate, PostgresConnectorConfig.SCHEMA_WHITELIST, null);
        validateField(validate, PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, null);
        validateField(validate, PostgresConnectorConfig.SCHEMA_BLACKLIST, null);
        validateField(validate, PostgresConnectorConfig.SCHEMA_EXCLUDE_LIST, null);
        validateField(validate, PostgresConnectorConfig.TABLE_WHITELIST, null);
        validateField(validate, PostgresConnectorConfig.TABLE_INCLUDE_LIST, null);
        validateField(validate, PostgresConnectorConfig.TABLE_BLACKLIST, null);
        validateField(validate, PostgresConnectorConfig.TABLE_EXCLUDE_LIST, null);
        validateField(validate, PostgresConnectorConfig.COLUMN_BLACKLIST, null);
        validateField(validate, PostgresConnectorConfig.COLUMN_EXCLUDE_LIST, null);
        validateField(validate, PostgresConnectorConfig.COLUMN_WHITELIST, null);
        validateField(validate, PostgresConnectorConfig.COLUMN_INCLUDE_LIST, null);
        validateField(validate, PostgresConnectorConfig.MSG_KEY_COLUMNS, null);
        validateField(validate, PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL);
        validateField(validate, RelationalDatabaseConnectorConfig.SNAPSHOT_LOCK_TIMEOUT_MS, Long.valueOf(RelationalDatabaseConnectorConfig.DEFAULT_SNAPSHOT_LOCK_TIMEOUT_MILLIS));
        validateField(validate, PostgresConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE);
        validateField(validate, PostgresConnectorConfig.DECIMAL_HANDLING_MODE, RelationalDatabaseConnectorConfig.DecimalHandlingMode.PRECISE);
        validateField(validate, PostgresConnectorConfig.SSL_SOCKET_FACTORY, null);
        validateField(validate, PostgresConnectorConfig.TCP_KEEPALIVE, true);
    }

    @Test
    public void shouldValidateReplicationSlotName() throws Exception {
        assertConfigurationErrors(new PostgresConnector().validate(Configuration.create().with(PostgresConnectorConfig.SLOT_NAME, "xx-aa").build().asMap()), PostgresConnectorConfig.SLOT_NAME, 1);
    }

    @Test
    public void shouldSupportSSLParameters() throws Exception {
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SSL_MODE, PostgresConnectorConfig.SecureConnectionMode.REQUIRED).build(), (z, str, th) -> {
            if (TestHelper.shouldSSLConnectionFail()) {
                Assertions.assertThat(z).isFalse();
                Assertions.assertThat(th).isInstanceOf(ConnectException.class);
                Throwable cause = th.getCause();
                Assertions.assertThat(cause).isInstanceOf(SQLException.class);
                Assertions.assertThat(PSQLState.CONNECTION_REJECTED.getState().equals(((SQLException) cause).getSQLState()));
            }
        });
        if (TestHelper.shouldSSLConnectionFail()) {
            assertConnectorNotRunning();
            return;
        }
        assertConnectorIsRunning();
        Thread.sleep(10000L);
        stopConnector();
    }

    @Test
    public void shouldProduceEventsWithInitialSnapshot() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
        start(PostgresConnector.class, with.build());
        assertConnectorIsRunning();
        assertRecordsFromSnapshot(2, 1, 1);
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertRecordsAfterInsert(2, 2, 2);
        stopConnector();
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        start(PostgresConnector.class, with.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        assertRecordsAfterInsert(2, 3, 3);
    }

    @Test
    @FixFor({"DBZ-1174"})
    public void shouldUseMicrosecondsForTransactionCommitTime() throws InterruptedException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(CommonConnectorConfig.SOURCE_STRUCT_MAKER_VERSION, CommonConnectorConfig.Version.V1).build());
        assertConnectorIsRunning();
        long micros = TimeUnit.SECONDS.toMicros(Instant.now().getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(r0.getNano());
        consumeRecordsByTopic(2).forEach(sourceRecord -> {
            assertSourceInfoMicrosecondTransactionTimestamp(sourceRecord, micros, TimeUnit.MINUTES.toMicros(1L));
        });
        TestHelper.execute(INSERT_STMT, new String[0]);
        long micros2 = TimeUnit.SECONDS.toMicros(Instant.now().getEpochSecond()) + TimeUnit.NANOSECONDS.toMicros(r0.getNano());
        consumeRecordsByTopic(2).forEach(sourceRecord2 -> {
            assertSourceInfoMicrosecondTransactionTimestamp(sourceRecord2, micros2, TimeUnit.MINUTES.toMicros(1L));
        });
        stopConnector();
        assertNoRecordsToConsume();
    }

    @Test
    @FixFor({"DBZ-1235"})
    public void shouldUseMillisecondsForTransactionCommitTime() throws InterruptedException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().build());
        assertConnectorIsRunning();
        long millis = TimeUnit.SECONDS.toMillis(Instant.now().getEpochSecond()) + TimeUnit.NANOSECONDS.toMillis(r0.getNano());
        consumeRecordsByTopic(2).forEach(sourceRecord -> {
            assertSourceInfoMillisecondTransactionTimestamp(sourceRecord, millis, TimeUnit.MINUTES.toMillis(1L));
        });
        TestHelper.execute(INSERT_STMT, new String[0]);
        long millis2 = TimeUnit.SECONDS.toMillis(Instant.now().getEpochSecond()) + TimeUnit.NANOSECONDS.toMillis(r0.getNano());
        consumeRecordsByTopic(2).forEach(sourceRecord2 -> {
            assertSourceInfoMillisecondTransactionTimestamp(sourceRecord2, millis2, TimeUnit.MINUTES.toMillis(1L));
        });
        stopConnector();
        assertNoRecordsToConsume();
    }

    @Test
    @FixFor({"DBZ-1161"})
    public void shouldConsumeMessagesFromSnapshot() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        for (int i = 0; i < 99; i++) {
            TestHelper.execute(INSERT_STMT, new String[0]);
        }
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 50).with(PostgresConnectorConfig.MAX_BATCH_SIZE, 10).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        Assertions.assertThat(consumeRecordsByTopic(100).recordsForTopic("test_server.s1.a")).hasSize(100);
        stopConnector();
    }

    @Test
    public void shouldConsumeMessagesFromSnapshotOld() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        for (int i = 0; i < 99; i++) {
            TestHelper.execute(INSERT_STMT, new String[0]);
        }
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 50).with(PostgresConnectorConfig.MAX_BATCH_SIZE, 10).with(PostgresConnectorConfig.SCHEMA_WHITELIST, "s1").build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        Assertions.assertThat(consumeRecordsByTopic(100).recordsForTopic("test_server.s1.a")).hasSize(100);
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-997"})
    public void shouldReceiveChangesForChangePKColumnDefinition() throws Exception {
        Testing.Print.enable();
        String str = "pkcolumndef" + new Random().nextInt(100);
        TestHelper.create().dropReplicationSlot(str);
        try {
            PostgresConnectorConfig postgresConnectorConfig = new PostgresConnectorConfig(TestHelper.defaultConfig().with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, Boolean.FALSE).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "changepk").with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).with(PostgresConnectorConfig.SLOT_NAME, str).build());
            String str2 = TestHelper.topicName("changepk.test_table");
            TestHelper.execute("CREATE SCHEMA IF NOT EXISTS changepk;", "DROP TABLE IF EXISTS changepk.test_table;", "CREATE TABLE changepk.test_table (pk SERIAL, text TEXT, PRIMARY KEY(pk));", "INSERT INTO changepk.test_table(text) VALUES ('insert');");
            start(PostgresConnector.class, postgresConnectorConfig.getConfig());
            assertConnectorIsRunning();
            consumeRecordsByTopic(1);
            TestHelper.execute("ALTER TABLE changepk.test_table DROP CONSTRAINT test_table_pkey;ALTER TABLE changepk.test_table RENAME COLUMN pk TO newpk;ALTER TABLE changepk.test_table ADD PRIMARY KEY(newpk);INSERT INTO changepk.test_table VALUES(2, 'newpkcol')", new String[0]);
            SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic(1).recordsForTopic(str2).get(0);
            TestCase.assertEquals(str2, sourceRecord.topic());
            VerifyRecord.isValidInsert(sourceRecord, "newpk", 2);
            TestHelper.execute("ALTER TABLE changepk.test_table ADD COLUMN pk2 SERIAL;ALTER TABLE changepk.test_table DROP CONSTRAINT test_table_pkey;ALTER TABLE changepk.test_table ADD PRIMARY KEY(newpk,pk2);INSERT INTO changepk.test_table VALUES(3, 'newpkcol', 8)", new String[0]);
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic(1).recordsForTopic(str2).get(0);
            TestCase.assertEquals(str2, sourceRecord2.topic());
            VerifyRecord.isValidInsert(sourceRecord2, "newpk", 3);
            VerifyRecord.isValidInsert(sourceRecord2, "pk2", 8);
            stopConnector();
            TestHelper.execute("INSERT INTO changepk.test_table VALUES(4, 'newpkcol', 20)", new String[0]);
            TestHelper.execute("ALTER TABLE changepk.test_table DROP CONSTRAINT test_table_pkey;ALTER TABLE changepk.test_table DROP COLUMN pk2;ALTER TABLE changepk.test_table ADD COLUMN pk3 SERIAL;ALTER TABLE changepk.test_table ADD PRIMARY KEY(newpk,pk3);INSERT INTO changepk.test_table VALUES(5, 'dropandaddpkcol',10)", new String[0]);
            start(PostgresConnector.class, postgresConnectorConfig.getConfig());
            AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
            SourceRecord sourceRecord3 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(str2).get(0);
            TestCase.assertEquals(str2, sourceRecord3.topic());
            VerifyRecord.isValidInsert(sourceRecord3, "newpk", 4);
            Struct struct = (Struct) sourceRecord3.key();
            Assertions.assertThat(struct.schema().field("pk2")).isNull();
            Assertions.assertThat(struct.schema().field("pk3")).isNull();
            SourceRecord sourceRecord4 = (SourceRecord) consumeRecordsByTopic.recordsForTopic(str2).get(1);
            TestCase.assertEquals(str2, sourceRecord4.topic());
            VerifyRecord.isValidInsert(sourceRecord4, "newpk", 5);
            VerifyRecord.isValidInsert(sourceRecord4, "pk3", 10);
            Assertions.assertThat(((Struct) sourceRecord4.key()).schema().field("pk2")).isNull();
            stopConnector();
            TestHelper.create().dropReplicationSlot(str);
            TestHelper.execute("DROP SCHEMA IF EXISTS changepk CASCADE;", new String[0]);
        } catch (Throwable th) {
            stopConnector(null);
            TestHelper.create().dropReplicationSlot(str);
            throw th;
        }
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT, reason = "Pgoutput will generate insert statements even for dropped tables, column optionality will default to true however")
    @FixFor({"DBZ-1021"})
    public void shouldIgnoreEventsForDeletedTable() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
        start(PostgresConnector.class, with.build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        assertRecordsFromSnapshot(2, 1, 1);
        waitForStreamingRunning();
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertRecordsAfterInsert(2, 2, 2);
        stopConnector();
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        TestHelper.execute("DROP TABLE s1.a", new String[0]);
        start(PostgresConnector.class, with.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"))).hasSize(1);
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Pgoutput will generate insert statements even for dropped tables, column optionality will default to true however")
    @FixFor({"DBZ-1021"})
    public void shouldNotIgnoreEventsForDeletedTable() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
        start(PostgresConnector.class, with.build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        assertRecordsFromSnapshot(2, 1, 1);
        waitForStreamingRunning();
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertRecordsAfterInsert(2, 2, 2);
        stopConnector();
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        TestHelper.execute("DROP TABLE s1.a", new String[0]);
        start(PostgresConnector.class, with.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(2);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"))).hasSize(1);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"))).hasSize(1);
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
    }

    @Test
    public void shouldIgnoreViews() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);CREATE VIEW s1.myview AS SELECT * from s1.a;", new String[0]);
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
        start(PostgresConnector.class, with.build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        assertRecordsFromSnapshot(2, 1, 1);
        waitForStreamingRunning();
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertRecordsAfterInsert(2, 2, 2);
        stopConnector();
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        start(PostgresConnector.class, with.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning();
        assertRecordsAfterInsert(2, 3, 3);
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
    }

    @Test
    @FixFor({"DBZ-693"})
    public void shouldExecuteOnConnectStatements() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.ON_CONNECT_STATEMENTS, "INSERT INTO s1.a (aa) VALUES (2); INSERT INTO s2.a (aa, bb) VALUES (2, 'hello;; world');").with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(6);
        assertKey((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0), "pk", 1);
        assertKey((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(1), "pk", 2);
        assertValueField((SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(5), "after/bb", "hello; world");
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
    }

    @Test
    public void shouldProduceEventsWhenSnapshotsAreNeverAllowed() throws InterruptedException {
        Testing.Print.enable();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        TestHelper.waitForDefaultReplicationSlotBeActive();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertRecordsAfterInsert(2, 2, 2);
    }

    @Test
    public void shouldNotProduceEventsWithInitialOnlySnapshot() throws InterruptedException {
        Testing.Print.enable();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        assertRecordsFromSnapshot(2, 1, 1);
        TestHelper.execute(INSERT_STMT, new String[0]);
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
    }

    @Test
    public void shouldProduceEventsWhenAlwaysTakingSnapshots() throws InterruptedException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.ALWAYS.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
        start(PostgresConnector.class, with.build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        assertRecordsFromSnapshot(2, 1, 1);
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertRecordsAfterInsert(2, 2, 2);
        stopConnector();
        assertNoRecordsToConsume();
        start(PostgresConnector.class, with.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        assertRecordsFromSnapshot(4, 1, 2, 1, 2);
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
    }

    @Test
    public void shouldResumeSnapshotIfFailingMidstream() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);", new String[0]);
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
        start(PostgresConnector.class, with.build(), (z, str, th) -> {
            if (th != null) {
                countDownLatch.countDown();
            } else {
                Assert.fail("A controlled exception was expected....");
            }
        }, stopOnPKPredicate(2));
        if (!countDownLatch.await(TestHelper.waitTimeForRecords() * 5, TimeUnit.SECONDS)) {
            Assert.fail("did not reach stop condition in time");
        }
        assertConnectorNotRunning();
        consumeAvailableRecords(sourceRecord -> {
        });
        stopConnector();
        assertNoRecordsToConsume();
        start(PostgresConnector.class, with.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        assertRecordsFromSnapshot(4, 1, 2, 1, 2);
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertRecordsAfterInsert(2, 3, 3);
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
    }

    @Test
    @FixFor({"DBZ-1857"})
    @SkipWhenDatabaseVersion(check = EqualityCheck.LESS_THAN, major = 10, reason = "Database version less than 10.0")
    public void shouldRecoverFromRetriableException() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning("postgres", "test_server");
        assertRecordsFromSnapshot(2, 1, 1);
        TestHelper.execute("SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE backend_type='walsender'", new String[0]);
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertRecordsAfterInsert(2, 2, 2);
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
    }

    @Test
    public void shouldTakeExcludeListFiltersIntoAccount() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);CREATE TABLE s1.b (pk SERIAL, aa integer, bb integer, PRIMARY KEY(pk));ALTER TABLE s1.a ADD COLUMN bb integer;INSERT INTO s1.a (aa, bb) VALUES (2, 2);INSERT INTO s1.a (aa, bb) VALUES (3, 3);INSERT INTO s1.b (aa, bb) VALUES (4, 4);INSERT INTO s2.a (aa) VALUES (5);", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SCHEMA_EXCLUDE_LIST, "s2").with(PostgresConnectorConfig.TABLE_EXCLUDE_LIST, ".+b").with(PostgresConnectorConfig.COLUMN_EXCLUDE_LIST, ".+bb").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"))).isNullOrEmpty();
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.b"))).isNullOrEmpty();
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(3);
        AtomicInteger atomicInteger = new AtomicInteger(1);
        recordsForTopic.forEach(sourceRecord -> {
            VerifyRecord.isValidRead(sourceRecord, "pk", atomicInteger.getAndIncrement());
            assertFieldAbsent(sourceRecord, "bb");
        });
        TestHelper.execute("INSERT INTO s1.b (aa, bb) VALUES (6, 6);INSERT INTO s2.a (aa) VALUES (7);", new String[0]);
        assertNoRecordsToConsume();
    }

    @Test
    public void shouldTakeBlacklistFiltersIntoAccount() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);CREATE TABLE s1.b (pk SERIAL, aa integer, bb integer, PRIMARY KEY(pk));ALTER TABLE s1.a ADD COLUMN bb integer;INSERT INTO s1.a (aa, bb) VALUES (2, 2);INSERT INTO s1.a (aa, bb) VALUES (3, 3);INSERT INTO s1.b (aa, bb) VALUES (4, 4);INSERT INTO s2.a (aa) VALUES (5);", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SCHEMA_BLACKLIST, "s2").with(PostgresConnectorConfig.TABLE_BLACKLIST, ".+b").with(PostgresConnectorConfig.COLUMN_BLACKLIST, ".+bb").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"))).isNullOrEmpty();
        Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.b"))).isNullOrEmpty();
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(3);
        AtomicInteger atomicInteger = new AtomicInteger(1);
        recordsForTopic.forEach(sourceRecord -> {
            VerifyRecord.isValidRead(sourceRecord, "pk", atomicInteger.getAndIncrement());
            assertFieldAbsent(sourceRecord, "bb");
        });
        TestHelper.execute("INSERT INTO s1.b (aa, bb) VALUES (6, 6);INSERT INTO s2.a (aa) VALUES (7);", new String[0]);
        assertNoRecordsToConsume();
    }

    @Test
    @FixFor({"DBZ-1962"})
    public void shouldTakeColumnWhitelistFilterIntoAccount() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);ALTER TABLE s1.a ADD COLUMN bb integer;ALTER TABLE s1.a ADD COLUMN cc char(12);INSERT INTO s1.a (aa, bb) VALUES (2, 2);", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with("column.mask.with.5.chars", ".+cc").with(PostgresConnectorConfig.COLUMN_INCLUDE_LIST, ".+aa,.+cc").build());
        assertConnectorIsRunning();
        consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("s1.a")).forEach(sourceRecord -> {
            assertFieldAbsent(sourceRecord, "bb");
            Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("after").getString("cc")).isEqualTo("*****");
        });
    }

    @Test
    @FixFor({"DBZ-1546"})
    public void shouldRemoveWhiteSpaceChars() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);CREATE TABLE s1.b (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.b (aa) VALUES (123);", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a, s1.b").build());
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(TestHelper.topicName("s1.b"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidRead(sourceRecord, "pk", 1);
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("table")).isEqualTo("b");
    }

    @Test
    public void shouldRemoveWhiteSpaceCharsOld() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);CREATE TABLE s1.b (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.b (aa) VALUES (123);", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SCHEMA_WHITELIST, "s1").with(PostgresConnectorConfig.TABLE_WHITELIST, "s1.a, s1.b").build());
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(2).recordsForTopic(TestHelper.topicName("s1.b"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidRead(sourceRecord, "pk", 1);
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("table")).isEqualTo("b");
    }

    @Test
    @FixFor({"DBZ-2118"})
    public void shouldCloseTxAfterTypeQuery() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.b").with(PostgresConnectorConfig.INCLUDE_UNKNOWN_DATATYPES, true).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        TestHelper.execute("CREATE TABLE s1.b (pk SERIAL, aa isbn, PRIMARY KEY(pk));", "INSERT INTO s1.b (aa) VALUES ('978-0-393-04002-9')");
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("s1.b"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidInsert(sourceRecord, "pk", 1);
        Assertions.assertThat(new String(((Struct) sourceRecord.value()).getStruct("after").getBytes("aa"))).isEqualTo("0-393-04002-X");
        TestHelper.assertNoOpenTransactions();
        stopConnector();
        assertConnectorNotRunning();
    }

    @Test
    @FixFor({"DBZ-878"})
    public void shouldReplaceInvalidTopicNameCharacters() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);CREATE TABLE s1.\"dbz_878_some|test@data\" (pk SERIAL, aa integer, PRIMARY KEY(pk));INSERT INTO s1.\"dbz_878_some|test@data\" (aa) VALUES (123);", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1\\.dbz_878_some\\|test@data").build());
        assertConnectorIsRunning();
        List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("s1.dbz_878_some_test_data"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.get(0);
        VerifyRecord.isValidRead(sourceRecord, "pk", 1);
        Assertions.assertThat(((Struct) sourceRecord.value()).getStruct("source").getString("table")).isEqualTo("dbz_878_some|test@data");
    }

    @Test
    @FixFor({"DBZ-1245"})
    public void shouldNotSendEmptyOffset() throws InterruptedException, SQLException {
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a").with(Heartbeat.HEARTBEAT_INTERVAL, 10).build());
        assertConnectorIsRunning();
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));", new String[0]);
        waitForAvailableRecords(1000L, TimeUnit.MILLISECONDS);
        SourceRecord consumeRecord = consumeRecord();
        Assertions.assertThat(consumeRecord == null || !consumeRecord.sourceOffset().isEmpty());
    }

    @Test
    @FixFor({"DBZ-965"})
    public void shouldRegularlyFlushLsn() throws InterruptedException, SQLException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a").build());
        assertConnectorIsRunning();
        waitForStreamingRunning("postgres", "test_server");
        assertNoRecordsToConsume();
        HashSet hashSet = new HashSet();
        PostgresConnection create = TestHelper.create();
        Throwable th = null;
        try {
            try {
                hashSet.add(getConfirmedFlushLsn(create));
                for (int i = 2; i <= 12; i++) {
                    TestHelper.execute(INSERT_STMT, new String[0]);
                    AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
                    Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(1);
                    Assertions.assertThat(consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a")).size()).isEqualTo(1);
                    try {
                        Awaitility.await().atMost(2L, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
                            return Boolean.valueOf(hashSet.add(getConfirmedFlushLsn(create)));
                        });
                    } catch (ConditionTimeoutException e) {
                    }
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                Assertions.assertThat(hashSet.size()).isGreaterThanOrEqualTo(7);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    @FixFor({"DBZ-2660"})
    public void shouldRegularlyFlushLsnWithTxMonitoring() throws InterruptedException, SQLException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        Configuration build = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a").with(PostgresConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).build();
        start(PostgresConnector.class, build);
        assertConnectorIsRunning();
        waitForStreamingRunning("postgres", "test_server");
        assertNoRecordsToConsume();
        String str = TestHelper.topicName("transaction");
        TestHelper.execute(INSERT_STMT, new String[0]);
        AbstractConnectorTest.SourceRecords consumeDmlRecordsByTopic = consumeDmlRecordsByTopic(1);
        Assertions.assertThat(consumeDmlRecordsByTopic.topics().size()).isEqualTo(2);
        Assertions.assertThat(consumeDmlRecordsByTopic.recordsForTopic(str).size()).isGreaterThanOrEqualTo(2);
        Assertions.assertThat(((SourceRecord) consumeDmlRecordsByTopic.recordsForTopic(str).get(1)).sourceOffset().containsKey("lsn_commit")).isTrue();
        stopConnector();
        assertConnectorNotRunning();
        start(PostgresConnector.class, build);
        assertConnectorIsRunning();
        waitForStreamingRunning("postgres", "test_server");
        assertOnlyTransactionRecordsToConsume();
        HashSet hashSet = new HashSet();
        PostgresConnection create = TestHelper.create();
        Throwable th = null;
        try {
            try {
                hashSet.add(getConfirmedFlushLsn(create));
                for (int i = 2; i <= 12; i++) {
                    TestHelper.execute(INSERT_STMT, new String[0]);
                    AbstractConnectorTest.SourceRecords consumeDmlRecordsByTopic2 = consumeDmlRecordsByTopic(1);
                    Assertions.assertThat(consumeDmlRecordsByTopic2.topics().size()).isEqualTo(2);
                    Assertions.assertThat(consumeDmlRecordsByTopic2.recordsForTopic(str).size()).isGreaterThanOrEqualTo(2);
                    Assertions.assertThat(consumeDmlRecordsByTopic2.recordsForTopic(TestHelper.topicName("s1.a")).size()).isEqualTo(1);
                    try {
                        Awaitility.await().atMost(2L, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
                            return Boolean.valueOf(hashSet.add(getConfirmedFlushLsn(create)));
                        });
                    } catch (ConditionTimeoutException e) {
                    }
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                Assertions.assertThat(hashSet.size()).isGreaterThanOrEqualTo(7);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON, reason = "Only wal2json decoder emits empty events and passes them to streaming source")
    @FixFor({"DBZ-892"})
    public void shouldFlushLsnOnEmptyMessage() throws InterruptedException, SQLException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s1.a").with(Heartbeat.HEARTBEAT_INTERVAL, 1000).build());
        assertConnectorIsRunning();
        waitForStreamingRunning("postgres", "test_server");
        assertNoRecordsToConsume();
        HashSet hashSet = new HashSet();
        TestHelper.execute(INSERT_STMT, new String[0]);
        Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
            List recordsForTopic = consumeRecordsByTopic(1).recordsForTopic(TestHelper.topicName("s1.a"));
            return Boolean.valueOf(recordsForTopic != null && recordsForTopic.size() == 1);
        });
        PostgresConnection create = TestHelper.create();
        Throwable th = null;
        try {
            try {
                hashSet.add(getConfirmedFlushLsn(create));
                for (int i = 0; i < 10; i++) {
                    TestHelper.execute("CREATE TEMPORARY TABLE xx(id INT);", new String[0]);
                    try {
                        Awaitility.await().atMost(5L, TimeUnit.SECONDS).ignoreExceptions().until(() -> {
                            return Boolean.valueOf(hashSet.add(getConfirmedFlushLsn(create)));
                        });
                    } catch (ConditionTimeoutException e) {
                    }
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
                Assertions.assertThat(hashSet.size()).isGreaterThanOrEqualTo(7);
            } finally {
            }
        } catch (Throwable th3) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
            throw th3;
        }
    }

    @Test
    @FixFor({"DBZ-1082"})
    public void shouldAllowForCustomSnapshot() throws InterruptedException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2).isNull();
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "pk", 1);
        TestHelper.execute(INSERT_STMT, new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "pk", 2);
        stopConnector();
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(4);
        List recordsForTopic5 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic6 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic5.size()).isEqualTo(2);
        Assertions.assertThat(recordsForTopic6.size()).isEqualTo(2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic5.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic5.get(1), "pk", 2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(1), "pk", 2);
    }

    @Test
    @FixFor({"DBZ-2456"})
    public void shouldAllowForSelectiveSnapshot() throws InterruptedException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.ALWAYS.name()).with(CommonConnectorConfig.SNAPSHOT_MODE_TABLES, "s1.a").with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
        start(PostgresConnector.class, with.build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2).isNull();
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "pk", 1);
        TestHelper.execute(INSERT_STMT, new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "pk", 2);
        stopConnector();
        start(PostgresConnector.class, with.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SNAPSHOT_MODE_TABLES, "s2.a").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        List recordsForTopic5 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic6 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic6.size()).isEqualTo(2);
        Assertions.assertThat(recordsForTopic5).isNull();
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic6.get(1), "pk", 2);
    }

    @Test
    @FixFor({"DBZ-1035"})
    public void shouldAllowForExportedSnapshot() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.EXPORTED.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic2.get(0), "pk", 1);
        TestHelper.execute(INSERT_STMT, new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(2);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "pk", 2);
        stopConnector();
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.EXPORTED.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        TestHelper.execute(INSERT_STMT, new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(2);
        List recordsForTopic5 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic6 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic5.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic6.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic5.get(0), "pk", 3);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic6.get(0), "pk", 3);
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.PGOUTPUT, reason = "PgOutput needs publication for manually created slot")
    @FixFor({"DBZ-2288"})
    public void exportedSnapshotShouldNotSkipRecordOfParallelTx() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.createDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        TestHelper.execute(INSERT_STMT, new String[0]);
        Configuration build = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.EXPORTED.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 2).with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1).build();
        PostgresConnection create = TestHelper.create();
        create.setAutoCommit(false);
        create.executeWithoutCommitting(new String[]{INSERT_STMT});
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        start(PostgresConnector.class, build, loggingCompletion(), sourceRecord -> {
            return false;
        }, sourceRecord2 -> {
            if (atomicBoolean.get()) {
                return;
            }
            TestHelper.execute(INSERT_STMT, new String[0]);
            try {
                create.commit();
                atomicBoolean.set(true);
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        });
        assertConnectorIsRunning();
        consumeRecordsByTopic(4);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(2);
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(2);
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication not supported")
    @FixFor({"DBZ-2288"})
    public void exportedSnapshotShouldNotSkipRecordOfParallelTxPgoutput() throws Exception {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.createDefaultReplicationSlot();
        TestHelper.execute("CREATE PUBLICATION dbz_publication FOR ALL TABLES;", new String[0]);
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        TestHelper.execute(INSERT_STMT, new String[0]);
        Configuration build = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.EXPORTED.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).with(PostgresConnectorConfig.MAX_QUEUE_SIZE, 2).with(PostgresConnectorConfig.MAX_BATCH_SIZE, 1).build();
        PostgresConnection create = TestHelper.create();
        create.setAutoCommit(false);
        create.executeWithoutCommitting(new String[]{INSERT_STMT});
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        start(PostgresConnector.class, build, loggingCompletion(), sourceRecord -> {
            return false;
        }, sourceRecord2 -> {
            if (atomicBoolean.get()) {
                return;
            }
            TestHelper.execute(INSERT_STMT, new String[0]);
            try {
                create.commit();
                atomicBoolean.set(true);
            } catch (Exception e) {
                throw new IllegalStateException(e);
            }
        });
        assertConnectorIsRunning();
        consumeRecordsByTopic(4);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(4);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(2);
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(2);
        stopConnector();
        TestHelper.dropPublication();
        TestHelper.dropDefaultReplicationSlot();
    }

    @Test
    @FixFor({"DBZ-1437"})
    public void shouldPeformSnapshotOnceForInitialOnlySnapshotMode() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted("postgres", "test_server");
        TestHelper.execute(INSERT_STMT, new String[0]);
        waitForAvailableRecords(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic2.get(0), "pk", 1);
        stopConnector();
        assertConnectorNotRunning();
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsMessage("Previous initial snapshot completed, no snapshot will be performed")).isTrue();
        });
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.WAL2JSON, reason = "No need for db write to complete catch-up phase")
    @FixFor({"DBZ-2094"})
    public void shouldResumeStreamingFromSlotPositionForCustomSnapshot() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomStartFromStreamingTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic2.get(0), "pk", 1);
        stopConnector();
        TestHelper.execute(INSERT_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomStartFromStreamingTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(6);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(3);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(3);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "pk", 2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic3.get(1), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic3.get(2), "pk", 2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic4.get(1), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic4.get(2), "pk", 2);
        TestHelper.assertNoOpenTransactions();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.WAL2JSON, reason = "Requires db write to complete catch-up phase")
    @FixFor({"DBZ-2772"})
    public void shouldResumeStreamingFromSlotPositionForCustomSnapshotWal2Json() throws Exception {
        Testing.Print.enable();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomStartFromStreamingTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic2.get(0), "pk", 1);
        stopConnector();
        assertConnectorNotRunning();
        TestHelper.execute(INSERT_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomStartFromStreamingTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning();
        TestHelper.execute("INSERT INTO s1.a (pk, aa) VALUES (1000, 1)", new String[0]);
        waitForSnapshotToBeCompleted();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(7);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(4);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(3);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(1), "pk", 1000);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic3.get(2), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic3.get(3), "pk", 2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic4.get(1), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic4.get(2), "pk", 2);
        TestHelper.assertNoOpenTransactions();
    }

    @Test
    @SkipWhenDecoderPluginNameIs(value = SkipWhenDecoderPluginNameIs.DecoderPluginName.WAL2JSON, reason = "Fails due to DBZ-3158")
    @FixFor({"DBZ-2094"})
    public void customSnapshotterSkipsTablesOnRestart() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.ALWAYS.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic2.get(0), "pk", 1);
        stopConnector();
        TestHelper.execute(INSERT_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomPartialTableTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        waitForStreamingRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(4);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(3);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "pk", 2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic3.get(1), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic3.get(2), "pk", 2);
        assertNoRecordsToConsume();
        TestHelper.assertNoOpenTransactions();
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsMessage("For table 's2.a' the select statement was not provided, skipping table")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-2094"})
    public void customSnapshotterSkipsTablesOnRestartWithConcurrentTx() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        Testing.Print.enable();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.ALWAYS.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        waitForStreamingRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.get(0), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic2.get(0), "pk", 1);
        stopConnector();
        TestHelper.execute(INSERT_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomPartialTableTestSnapshot.class.getName()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE).build());
        assertConnectorIsRunning();
        Awaitility.await().alias("Streaming was not started on time").pollInterval(1000L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).ignoreException(InstanceNotFoundException.class).until(() -> {
            TestHelper.create().execute(new String[]{"vacuum full"}).close();
            return Boolean.valueOf(((Boolean) ManagementFactory.getPlatformMBeanServer().getAttribute(getSnapshotMetricsObjectName("postgres", "test_server"), "SnapshotCompleted")).booleanValue());
        });
        waitForStreamingRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(4);
        List recordsForTopic3 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic4 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(3);
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic3.get(0), "pk", 2);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic4.get(0), "pk", 2);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic3.get(1), "pk", 1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic3.get(2), "pk", 2);
        assertNoRecordsToConsume();
        TestHelper.assertNoOpenTransactions();
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsMessage("For table 's2.a' the select statement was not provided, skipping table")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-2608"})
    public void testCustomSnapshotterSnapshotCompleteLifecycleHook() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE TABLE s1.lifecycle_state (hook text, state text, PRIMARY KEY(hook));", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.CUSTOM.getValue()).with(PostgresConnectorConfig.SNAPSHOT_MODE_CLASS, CustomLifecycleHookTestSnapshot.class.getName()).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        PostgresConnection create = TestHelper.create();
        Throwable th = null;
        try {
            try {
                TestCase.assertEquals(Collections.singletonList("complete"), (List) create.queryAndMap("SELECT state FROM s1.lifecycle_state WHERE hook like 'snapshotComplete'", resultSet -> {
                    ArrayList arrayList = new ArrayList();
                    while (resultSet.next()) {
                        arrayList.add(resultSet.getString(1));
                    }
                    return arrayList;
                }));
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private String getConfirmedFlushLsn(PostgresConnection postgresConnection) throws SQLException {
        String str = (String) postgresConnection.prepareQueryAndMap("select * from pg_replication_slots where slot_name = ? and database = ? and plugin = ?", preparedStatement -> {
            preparedStatement.setString(1, "debezium");
            preparedStatement.setString(2, "postgres");
            preparedStatement.setString(3, TestHelper.decoderPlugin().getPostgresPluginName());
        }, resultSet -> {
            if (resultSet.next()) {
                return resultSet.getString("confirmed_flush_lsn");
            }
            Assert.fail("No replication slot info available");
            return null;
        });
        postgresConnection.rollback();
        return str;
    }

    private void assertFieldAbsent(SourceRecord sourceRecord, String str) {
        try {
            ((Struct) ((Struct) sourceRecord.value()).get("after")).get(str);
            Assert.fail("field should not be present");
        } catch (DataException e) {
        }
    }

    @Test
    @Ignore
    public void testStreamingPerformance() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        batchInsertRecords(1000000L, 1000);
        CompletableFuture.runAsync(() -> {
            consumeRecords(1000000L);
        }).exceptionally(th -> {
            throw new RuntimeException(th);
        }).get();
    }

    private void consumeRecords(long j) {
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        while (i < j) {
            int consumeAvailableRecords = super.consumeAvailableRecords(sourceRecord -> {
            });
            if (consumeAvailableRecords > 0) {
                i += consumeAvailableRecords;
                System.out.println("consumed " + i + " records");
            }
        }
        System.out.println("total duration to ingest '" + j + "' records: " + Strings.duration(System.currentTimeMillis() - currentTimeMillis));
    }

    @Test
    @Ignore
    public void testSnapshotPerformance() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE);
        batchInsertRecords(1000000L, 1000).get();
        start(PostgresConnector.class, with.build());
        assertConnectorIsRunning();
        CompletableFuture.runAsync(() -> {
            consumeRecords(1000000L);
        }).exceptionally(th -> {
            throw new RuntimeException(th);
        }).get();
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testEmptySchemaWarningAfterApplyingFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY.getValue()).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "my_products").build());
        assertConnectorIsRunning();
        waitForAvailableRecords(10 * TestHelper.waitTimeForRecords() * 5, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-1242"})
    public void testNoEmptySchemaWarningAfterApplyingFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.dropAllSchemas();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL_ONLY.getValue()).build());
        assertConnectorIsRunning();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsWarnMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse();
        });
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
    @FixFor({"DBZ-1436"})
    public void testCustomPublicationNameUsed() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.dropAllSchemas();
        TestHelper.dropPublication("cdc");
        TestHelper.executeDDL("postgres_create_tables.ddl");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc").build());
        assertConnectorIsRunning();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsMessage("Creating new publication 'cdc' for plugin 'PGOUTPUT'")).isTrue();
        });
        Assert.assertTrue(TestHelper.publicationExists("cdc"));
    }

    @Test
    @FixFor({"DBZ-1015"})
    public void shouldRewriteIdentityKey() throws InterruptedException {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1,s2").with(PostgresConnectorConfig.MSG_KEY_COLUMNS, "(.*)1.a:pk,aa").build());
        waitForSnapshotToBeCompleted();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        consumeRecordsByTopic.recordsForTopic("test_server.s1.a").forEach(sourceRecord -> {
            Struct struct = (Struct) sourceRecord.key();
            Assertions.assertThat(struct.get("pk")).isNotNull();
            Assertions.assertThat(struct.get("aa")).isNotNull();
        });
        consumeRecordsByTopic.recordsForTopic("test_server.s2.a").forEach(sourceRecord2 -> {
            Struct struct = (Struct) sourceRecord2.key();
            Assertions.assertThat(struct.get("pk")).isNotNull();
            Assertions.assertThat(struct.get("pk")).isNotNull();
            Assertions.assertThat(struct.schema().field("aa")).isNull();
        });
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1519"})
    public void shouldNotIssueWarningForNoMonitoredTablesAfterApplyingFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        TestHelper.execute(INSERT_STMT, new String[0]);
        Configuration build = TestHelper.defaultConfig().with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s2").build();
        start(PostgresConnector.class, build);
        waitForSnapshotToBeCompleted();
        consumeRecordsByTopic(1);
        Assertions.assertThat(logInterceptor.containsMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse();
        stopConnector();
        start(PostgresConnector.class, build);
        waitForStreamingRunning();
        Assertions.assertThat(logInterceptor.containsMessage("After applying the include/exclude list filters, no changes will be captured. Please check your configuration!")).isFalse();
        stopConnector();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.DECODERBUFS, reason = "Expected warning message is emitted by protobuf decoder")
    @FixFor({"DBZ-2865"})
    public void shouldClearDatabaseWarnings() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        TestHelper.execute(INSERT_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.POLL_INTERVAL_MS, "10").build());
        waitForSnapshotToBeCompleted();
        Awaitility.await().atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords() * 6)).until(() -> {
            return Boolean.valueOf(logInterceptor.containsMessage("Server-side message: 'Exiting startup callback'"));
        });
        stopConnector();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication test specifically for pgoutput")
    @FixFor({"DBZ-1684"})
    public void shouldCreatePublicationWhenReplicationSlotExists() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.dropPublication();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.executeDDL("postgres_create_tables.ddl");
        Configuration build = TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, false).build();
        start(PostgresConnector.class, build);
        waitForStreamingRunning();
        Assert.assertTrue(TestHelper.publicationExists());
        stopConnector();
        TestHelper.dropPublication();
        LogInterceptor logInterceptor = new LogInterceptor();
        start(PostgresConnector.class, build);
        waitForStreamingRunning();
        Assert.assertTrue(TestHelper.publicationExists());
        stopConnector(z -> {
            Assertions.assertThat(logInterceptor.containsMessage("Creating new publication 'dbz_publication' for plugin 'PGOUTPUT'")).isTrue();
        });
    }

    @Test
    @FixFor({"DBZ-1685"})
    public void shouldConsumeEventsWithMaskedColumns() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with("column.mask.with.5.chars", "s2.a.bb").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.remove(0);
        VerifyRecord.isValidRead(sourceRecord, "pk", 1);
        Struct struct = (Struct) sourceRecord.value();
        if (struct.getStruct("after") != null) {
            Assertions.assertThat(struct.getStruct("after").getString("bb")).isEqualTo("*****");
        }
        TestHelper.execute("INSERT INTO s2.a (aa,bb) VALUES (1, 'test');", new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
        List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.remove(0);
        VerifyRecord.isValidInsert(sourceRecord2, "pk", 2);
        Struct struct2 = (Struct) sourceRecord2.value();
        if (struct2.getStruct("after") != null) {
            Assertions.assertThat(struct2.getStruct("after").getString("bb")).isEqualTo("*****");
        }
        TestHelper.execute("UPDATE s2.a SET aa=2, bb='hello' WHERE pk=2;", new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
        List recordsForTopic3 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
        SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic3.remove(0);
        VerifyRecord.isValidUpdate(sourceRecord3, "pk", 2);
        Struct struct3 = (Struct) sourceRecord3.value();
        if (struct3.getStruct("before") != null) {
            Assertions.assertThat(struct3.getStruct("before").getString("bb")).isEqualTo("*****");
        }
        if (struct3.getStruct("after") != null) {
            Assertions.assertThat(struct3.getStruct("after").getString("bb")).isEqualTo("*****");
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1692"})
    public void shouldConsumeEventsWithMaskedHashedColumns() throws Exception {
        TestHelper.execute("DROP SCHEMA IF EXISTS s1 CASCADE;DROP SCHEMA IF EXISTS s2 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s2.a (pk SERIAL, aa integer, bb varchar(20), PRIMARY KEY(pk));INSERT INTO s1.a (aa) VALUES (1);INSERT INTO s2.a (aa) VALUES (1);CREATE TABLE s2.b (pk SERIAL, bb varchar(255), PRIMARY KEY(pk));", new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with("column.mask.hash.SHA-256.with.salt.CzQMA0cB5K", "s2.a.bb, s2.b.bb").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic.remove(0);
        VerifyRecord.isValidRead(sourceRecord, "pk", 1);
        Struct struct = (Struct) sourceRecord.value();
        if (struct.getStruct("after") != null) {
            Assertions.assertThat(struct.getStruct("after").getString("bb")).isNull();
        }
        TestHelper.execute("INSERT INTO s2.a (aa,bb) VALUES (1, 'test');", new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
        List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic2.remove(0);
        VerifyRecord.isValidInsert(sourceRecord2, "pk", 2);
        Struct struct2 = (Struct) sourceRecord2.value();
        if (struct2.getStruct("after") != null) {
            Assertions.assertThat(struct2.getStruct("after").getString("bb")).isEqualTo("8e68c68edbbac316dfe2");
        }
        TestHelper.execute("UPDATE s2.a SET aa=2, bb='hello' WHERE pk=2;", new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
        List recordsForTopic3 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
        SourceRecord sourceRecord3 = (SourceRecord) recordsForTopic3.remove(0);
        VerifyRecord.isValidUpdate(sourceRecord3, "pk", 2);
        Struct struct3 = (Struct) sourceRecord3.value();
        if (struct3.getStruct("after") != null) {
            Assertions.assertThat(struct3.getStruct("after").getString("bb")).isEqualTo("b4d39ab0d198fb4cac8b");
        }
        TestHelper.execute("INSERT INTO s2.b (bb) VALUES ('hello');", new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic4 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic4.topics().size()).isEqualTo(1);
        List recordsForTopic4 = consumeRecordsByTopic4.recordsForTopic(TestHelper.topicName("s2.b"));
        Assertions.assertThat(recordsForTopic4.size()).isEqualTo(1);
        SourceRecord sourceRecord4 = (SourceRecord) recordsForTopic4.remove(0);
        VerifyRecord.isValidInsert(sourceRecord4, "pk", 1);
        Struct struct4 = (Struct) sourceRecord4.value();
        if (struct4.getStruct("before") != null) {
            Assertions.assertThat(struct4.getStruct("before").getString("bb")).isNull();
        }
        if (struct4.getStruct("after") != null) {
            Assertions.assertThat(struct4.getStruct("after").getString("bb")).isEqualTo("b4d39ab0d198fb4cac8b2f023da74f670bcaf192dcc79b5d6361b7ae6b2fafdf");
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1972"})
    public void shouldConsumeEventsWithTruncatedColumns() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with("column.truncate.to.3.chars", "s2.a.bb").build());
        assertConnectorIsRunning();
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(2);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(2);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(1);
        VerifyRecord.isValidRead((SourceRecord) recordsForTopic.remove(0), "pk", 1);
        TestHelper.execute("INSERT INTO s2.a (aa,bb) VALUES (1, 'test');", new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic2 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic2.topics().size()).isEqualTo(1);
        List recordsForTopic2 = consumeRecordsByTopic2.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(1);
        SourceRecord sourceRecord = (SourceRecord) recordsForTopic2.remove(0);
        VerifyRecord.isValidInsert(sourceRecord, "pk", 2);
        Struct struct = (Struct) sourceRecord.value();
        if (struct.getStruct("after") != null) {
            Assertions.assertThat(struct.getStruct("after").getString("bb")).isEqualTo("tes");
        }
        TestHelper.execute("UPDATE s2.a SET aa=2, bb='hello' WHERE pk=2;", new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic3 = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic3.topics().size()).isEqualTo(1);
        List recordsForTopic3 = consumeRecordsByTopic3.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic3.size()).isEqualTo(1);
        SourceRecord sourceRecord2 = (SourceRecord) recordsForTopic3.remove(0);
        VerifyRecord.isValidUpdate(sourceRecord2, "pk", 2);
        Struct struct2 = (Struct) sourceRecord2.value();
        if (struct2.getStruct("before") != null && struct2.getStruct("before").getString("bb") != null) {
            Assertions.assertThat(struct2.getStruct("before").getString("bb")).isEqualTo("tes");
        }
        if (struct2.getStruct("after") != null) {
            Assertions.assertThat(struct2.getStruct("after").getString("bb")).isEqualTo("hel");
        }
        stopConnector();
    }

    @Test
    @FixFor({"DBZ-1292"})
    @SkipWhenKafkaVersion(check = EqualityCheck.EQUAL, value = SkipWhenKafkaVersion.KafkaVersion.KAFKA_1XX, description = "Not compatible with Kafka 1.x")
    public void shouldOutputRecordsInCloudEventsFormat() throws Exception {
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.INITIAL.getValue()).with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        for (SourceRecord sourceRecord : consumeRecordsByTopic(2).allRecordsInOrder()) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord, false);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord, "postgresql", "test_server", false);
        }
        waitForStreamingRunning();
        TestHelper.execute(INSERT_STMT, new String[0]);
        Testing.Print.enable();
        ArrayList<SourceRecord> arrayList = new ArrayList();
        Awaitility.await().atMost(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS).until(() -> {
            SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic(2).allRecordsInOrder().get(1);
            if (sourceRecord2.topic().endsWith(".transaction")) {
                return false;
            }
            arrayList.add(sourceRecord2);
            return true;
        });
        arrayList.add(consumeRecordsByTopic(2).allRecordsInOrder().get(0));
        for (SourceRecord sourceRecord2 : arrayList) {
            CloudEventsConverterTest.shouldConvertToCloudEventsInJson(sourceRecord2, true);
            CloudEventsConverterTest.shouldConvertToCloudEventsInJsonWithDataAsAvro(sourceRecord2, true);
            CloudEventsConverterTest.shouldConvertToCloudEventsInAvro(sourceRecord2, "postgresql", "test_server", true);
        }
        stopConnector();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
    @FixFor({"DBZ-1813"})
    public void shouldConfigureSubscriptionsForAllTablesByDefault() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.dropAllSchemas();
        TestHelper.dropPublication("cdc");
        TestHelper.executeDDL("postgres_create_tables.ddl");
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc").build());
        assertConnectorIsRunning();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector(z -> {
            Assert.assertTrue(logInterceptor.containsMessage("Creating Publication with statement 'CREATE PUBLICATION cdc FOR ALL TABLES;'") && logInterceptor.containsMessage("Creating new publication 'cdc' for plugin 'PGOUTPUT'"));
        });
        Assert.assertTrue(TestHelper.publicationExists("cdc"));
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
    @FixFor({"DBZ-1813"})
    public void shouldConfigureSubscriptionsFromTableFilters() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.dropAllSchemas();
        TestHelper.dropPublication("cdc");
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.numeric_table,public.text_table,s1.a,s2.a").with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.FILTERED.getValue()).build());
        assertConnectorIsRunning();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertRecordsFromSnapshot(2, 1, 1);
        TestHelper.execute(INSERT_STMT, new String[0]);
        assertRecordsAfterInsert(2, 2, 2);
        stopConnector(z -> {
            Assert.assertTrue(logInterceptor.containsMessage("Creating Publication with statement 'CREATE PUBLICATION cdc FOR TABLE \"public\".\"numeric_table\", \"public\".\"text_table\", \"s1\".\"a\", \"s2\".\"a\";'"));
            Assert.assertTrue(logInterceptor.containsMessage("Creating new publication 'cdc' for plugin 'PGOUTPUT'"));
        });
        Assert.assertTrue(TestHelper.publicationExists("cdc"));
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
    @FixFor({"DBZ-1813"})
    public void shouldThrowWhenAutocreationIsDisabled() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.dropPublication("cdc");
        Configuration.Builder with = TestHelper.defaultConfig().with(PostgresConnectorConfig.SLOT_NAME, "cdc").with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.DISABLED.getValue());
        start(PostgresConnector.class, with.build(), (z, str, th) -> {
            TestCase.assertEquals(th.getClass(), ConnectException.class);
            TestCase.assertEquals(th.getMessage(), "Publication autocreation is disabled, please create one and restart the connector.");
        });
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        stopConnector();
        Assert.assertFalse(TestHelper.publicationExists("cdc"));
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
    @FixFor({"DBZ-1813"})
    public void shouldProduceMessagesOnlyForConfiguredTables() throws Exception {
        TestHelper.dropAllSchemas();
        TestHelper.dropPublication("cdc");
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc").with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "s2.a").with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.FILTERED.getValue()).build());
        assertConnectorIsRunning();
        waitForSnapshotToBeCompleted();
        consumeRecordsByTopic(1);
        TestHelper.execute(INSERT_STMT, new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(1);
        Assertions.assertThat(consumeRecordsByTopic.topics()).hasSize(1);
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic).isNull();
        Assertions.assertThat(recordsForTopic2).hasSize(1);
        VerifyRecord.isValidInsert((SourceRecord) recordsForTopic2.get(0), "pk", 2);
        stopConnector();
    }

    @Test
    @SkipWhenDecoderPluginNameIsNot(value = SkipWhenDecoderPluginNameIsNot.DecoderPluginName.PGOUTPUT, reason = "Publication configuration only valid for PGOUTPUT decoder")
    @FixFor({"DBZ-2885"})
    public void shouldThrowWhenTableFiltersIsEmpty() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor();
        TestHelper.dropAllSchemas();
        TestHelper.dropPublication("cdc");
        TestHelper.executeDDL("postgres_create_tables.ddl");
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.PUBLICATION_NAME, "cdc").with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.FILTERED.getValue()).with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "nonexistent.table").build());
        assertConnectorNotRunning();
        Assert.assertTrue(logInterceptor.containsStacktraceElement("No table filters found for filtered publication cdc"));
    }

    private CompletableFuture<Void> batchInsertRecords(long j, int i) {
        String str = "INSERT INTO text_table(j, jb, x, u) VALUES ('{\"bar\": \"baz\"}'::json, '{\"bar\": \"baz\"}'::jsonb, '<foo>bar</foo><foo>bar</foo>'::xml, 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::UUID);";
        return CompletableFuture.runAsync(() -> {
            StringBuilder sb = new StringBuilder();
            for (int i2 = 0; i2 < j; i2++) {
                sb.append(str).append(System.lineSeparator());
                if (i2 > 0 && i2 % i == 0) {
                    System.out.println("inserting batch [" + (i2 - i) + "," + i2 + "]");
                    TestHelper.execute(sb.toString(), new String[0]);
                    sb.delete(0, sb.length());
                }
            }
            System.out.println("inserting batch [" + (j - i) + "," + j + "]");
            TestHelper.execute(sb.toString(), new String[0]);
            sb.delete(0, sb.length());
        }).exceptionally(th -> {
            throw new RuntimeException(th);
        });
    }

    private Predicate<SourceRecord> stopOnPKPredicate(int i) {
        return sourceRecord -> {
            return ((Integer) ((Struct) sourceRecord.key()).get("pk")).intValue() == i;
        };
    }

    private void assertRecordsFromSnapshot(int i, int... iArr) throws InterruptedException {
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(i);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder().size()).isEqualTo(i);
        int i2 = i / 2;
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(i2);
        IntStream.range(0, i2).forEach(i3 -> {
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic.remove(0), "pk", iArr[i3]);
        });
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(i2);
        IntStream.range(0, i2).forEach(i4 -> {
            VerifyRecord.isValidRead((SourceRecord) recordsForTopic2.remove(0), "pk", iArr[i4 + i2]);
        });
    }

    private void assertRecordsAfterInsert(int i, int... iArr) throws InterruptedException {
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(i);
        Assertions.assertThat(consumeRecordsByTopic.topics().size()).isEqualTo(i);
        int i2 = i / 2;
        List recordsForTopic = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s1.a"));
        Assertions.assertThat(recordsForTopic.size()).isEqualTo(i2);
        IntStream.range(0, i2).forEach(i3 -> {
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic.remove(0), "pk", iArr[i3]);
        });
        List recordsForTopic2 = consumeRecordsByTopic.recordsForTopic(TestHelper.topicName("s2.a"));
        Assertions.assertThat(recordsForTopic2.size()).isEqualTo(i2);
        IntStream.range(0, i2).forEach(i4 -> {
            VerifyRecord.isValidInsert((SourceRecord) recordsForTopic2.remove(0), "pk", iArr[i4]);
        });
    }

    protected void assertSourceInfoMicrosecondTransactionTimestamp(SourceRecord sourceRecord, long j, long j2) {
        Assert.assertTrue(sourceRecord.value() instanceof Struct);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("source");
        System.out.println("TS_USEC\t" + struct.getInt64("ts_usec"));
        Assert.assertTrue(Math.abs(j - struct.getInt64("ts_usec").longValue()) < j2);
    }

    protected void assertSourceInfoMillisecondTransactionTimestamp(SourceRecord sourceRecord, long j, long j2) {
        Assert.assertTrue(sourceRecord.value() instanceof Struct);
        Struct struct = ((Struct) sourceRecord.value()).getStruct("source");
        System.out.println("TS_MS\t" + struct.getInt64("ts_ms"));
        Assert.assertTrue(Math.abs(j - struct.getInt64("ts_ms").longValue()) < j2);
    }

    private <T> void validateField(Config config, Field field, T t) {
        assertNoConfigurationErrors(config, new Field[]{field});
        Object value = configValue(config, field.name()).value();
        if (value == null) {
            value = field.defaultValue();
        }
        if (t == null) {
            Assertions.assertThat(value).isNull();
        } else if (t instanceof EnumeratedValue) {
            Assertions.assertThat(((EnumeratedValue) t).getValue()).isEqualTo(value.toString());
        } else {
            Assertions.assertThat(t).isEqualTo(value);
        }
    }

    private void validateFieldDef(Field field) {
        ConfigDef config = this.connector.config();
        Assertions.assertThat(config.names()).contains(new Object[]{field.name()});
        ConfigDef.ConfigKey configKey = (ConfigDef.ConfigKey) config.configKeys().get(field.name());
        Assertions.assertThat(configKey).isNotNull();
        Assertions.assertThat(configKey.name).isEqualTo(field.name());
        Assertions.assertThat(configKey.displayName).isEqualTo(field.displayName());
        Assertions.assertThat(configKey.importance).isEqualTo(field.importance());
        Assertions.assertThat(configKey.documentation).isEqualTo(field.description());
        Assertions.assertThat(configKey.type).isEqualTo(field.type());
        Assertions.assertThat(configKey.defaultValue).isEqualTo(field.defaultValue());
        Assertions.assertThat(configKey.dependents).isEqualTo(field.dependents());
        Assertions.assertThat(configKey.width).isNotNull();
        Assertions.assertThat(configKey.group).isNotNull();
        Assertions.assertThat(configKey.orderInGroup).isGreaterThan(0);
        Assertions.assertThat(configKey.validator).isNull();
        Assertions.assertThat(configKey.recommender).isNull();
    }

    private void waitForSnapshotToBeCompleted() throws InterruptedException {
        waitForSnapshotToBeCompleted("postgres", "test_server");
    }

    private void waitForStreamingRunning() throws InterruptedException {
        waitForStreamingRunning("postgres", "test_server");
    }
}
