package io.debezium.connector.postgresql;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.CustomActionProvider;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.actions.Log;
import java.lang.management.ManagementFactory;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/debezium/connector/postgresql/SignalsIT.class */
public class SignalsIT extends AbstractConnectorTest {
    private static final String INSERT_STMT = "INSERT INTO s1.a (aa) VALUES (1);";
    private static final String SETUP_TABLES_STMT = "DROP SCHEMA IF EXISTS s1 CASCADE;CREATE SCHEMA s1; CREATE SCHEMA s2; CREATE TABLE s1.a (pk SERIAL, aa integer, PRIMARY KEY(pk));CREATE TABLE s1.debezium_signal (id varchar(32), type varchar(32), data varchar(2048));INSERT INTO s1.a (aa) VALUES (1);";

    @Before
    public void before() throws SQLException {
        TestHelper.dropAllSchemas();
        initializeConnectorTestFramework();
    }

    @After
    public void after() {
        stopConnector();
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.dropPublication();
    }

    @Test
    public void signalLog() throws InterruptedException {
        LogInterceptor logInterceptor = new LogInterceptor(Log.class);
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal").with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500").build());
        assertConnectorIsRunning();
        TestHelper.waitForDefaultReplicationSlotBeActive();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        TestHelper.execute("INSERT INTO s1.debezium_signal VALUES('1', 'log', '{\"message\": \"Signal message at offset ''{}''\"}')", new String[0]);
        waitForAvailableRecords(800L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(consumeRecordsByTopic(2).allRecordsInOrder()).hasSize(2);
        Assertions.assertThat(logInterceptor.containsMessage("Signal message at offset")).isTrue();
    }

    @Test
    public void signalingDisabled() throws InterruptedException {
        LogInterceptor logInterceptor = new LogInterceptor(Log.class);
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal").with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500").with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "").build());
        assertConnectorIsRunning();
        TestHelper.waitForDefaultReplicationSlotBeActive();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
        TestHelper.execute("INSERT INTO s1.debezium_signal VALUES('1', 'log', '{\"message\": \"Signal message\"}')", new String[0]);
        Awaitility.await().pollDelay(2000L, TimeUnit.MILLISECONDS).until(() -> {
            return true;
        });
        TestHelper.execute(INSERT_STMT, new String[0]);
        Assertions.assertThat(consumeRecordsByTopic(2).allRecordsInOrder()).hasSize(2);
        Assertions.assertThat(logInterceptor.containsMessage("Signal message")).isFalse();
    }

    @Test
    public void signalSchemaChange() throws InterruptedException {
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(PostgresConnectorConfig.SIGNAL_DATA_COLLECTION, "s1.debezium_signal").with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500").build());
        assertConnectorIsRunning();
        TestHelper.waitForDefaultReplicationSlotBeActive();
        waitForAvailableRecords(100L, TimeUnit.MILLISECONDS);
        assertNoRecordsToConsume();
        TestHelper.execute(INSERT_STMT, new String[0]);
        TestHelper.execute("INSERT INTO s1.debezium_signal VALUES('1', 'schema-changes', '{\"database\": \"postgres\", \"changes\": [{\n  \"type\" : \"ALTER\",\n  \"id\" : \"\\\"s1\\\".\\\"a\\\"\",\n  \"table\" : {\n    \"defaultCharsetName\" : null,\n    \"primaryKeyColumnNames\" : [ \"pk\", \"aa\" ],\n    \"columns\" : [ {\n      \"name\" : \"pk\",\n      \"jdbcType\" : 4,\n      \"nativeType\" : 23,\n      \"typeName\" : \"serial\",\n      \"typeExpression\" : \"serial\",\n      \"charsetName\" : null,\n      \"length\" : 10,\n      \"scale\" : 0,\n      \"position\" : 1,\n      \"optional\" : false,\n      \"autoIncremented\" : true,\n      \"generated\" : false\n    }, {\n      \"name\" : \"aa\",\n      \"jdbcType\" : 4,\n      \"nativeType\" : 23,\n      \"typeName\" : \"int4\",\n      \"typeExpression\" : \"int4\",\n      \"charsetName\" : null,\n      \"length\" : 10,\n      \"scale\" : 0,\n      \"position\" : 2,\n      \"optional\" : true,\n      \"autoIncremented\" : false,\n      \"generated\" : false\n    } ]\n  }\n}]}')", new String[0]);
        Awaitility.await().pollDelay(2000L, TimeUnit.MILLISECONDS).until(() -> {
            return true;
        });
        TestHelper.execute(INSERT_STMT, new String[0]);
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(3);
        Assertions.assertThat(consumeRecordsByTopic.allRecordsInOrder()).hasSize(3);
        SourceRecord sourceRecord = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(0);
        SourceRecord sourceRecord2 = (SourceRecord) consumeRecordsByTopic.allRecordsInOrder().get(2);
        Assertions.assertThat(((Struct) sourceRecord.key()).schema().fields()).hasSize(1);
        Struct struct = (Struct) sourceRecord2.key();
        Assertions.assertThat(struct.schema().fields()).hasSize(2);
        Assertions.assertThat(struct.schema().field("pk")).isNotNull();
        Assertions.assertThat(struct.schema().field("aa")).isNotNull();
    }

    @Test
    public void jmxSignals() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(Log.class);
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500").with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "jmx").build());
        assertConnectorIsRunning();
        TestHelper.waitForDefaultReplicationSlotBeActive();
        sendLogSignalWithJmx("1", "log", "{\"message\": \"Signal message at offset ''{}''\"}");
        waitForAvailableRecords(800L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(logInterceptor.containsMessage("Signal message at offset")).isTrue();
    }

    @Test
    public void customAction() throws Exception {
        LogInterceptor logInterceptor = new LogInterceptor(CustomActionProvider.CustomAction.class);
        TestHelper.dropDefaultReplicationSlot();
        TestHelper.execute(SETUP_TABLES_STMT, new String[0]);
        start(PostgresConnector.class, TestHelper.defaultConfig().with(PostgresConnectorConfig.SNAPSHOT_MODE, PostgresConnectorConfig.SnapshotMode.NEVER.getValue()).with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE).with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, "500").with(CommonConnectorConfig.SIGNAL_ENABLED_CHANNELS, "jmx").build());
        assertConnectorIsRunning();
        TestHelper.waitForDefaultReplicationSlotBeActive();
        sendLogSignalWithJmx("1", "customLog", "{\"message\": \"Signal message at offset ''{}''\"}");
        waitForAvailableRecords(800L, TimeUnit.MILLISECONDS);
        Assertions.assertThat(logInterceptor.containsMessage("[CustomLog]")).isTrue();
    }

    private void sendLogSignalWithJmx(String str, String str2, String str3) throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, MBeanException {
        ManagementFactory.getPlatformMBeanServer().invoke(new ObjectName("debezium.postgres:type=management, context=signals, server=test_server"), "signal", new Object[]{str, str2, str3}, new String[]{String.class.getName(), String.class.getName(), String.class.getName()});
    }
}
