/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sources.debezium;

import com.google.common.base.Preconditions;
import java.util.Map;
import org.apache.pulsar.tests.integration.containers.DebeziumMsSqlContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.util.Strings;

public class DebeziumMsSqlSourceTester
extends SourceTester<DebeziumMsSqlContainer> {
    private static final Logger log = LoggerFactory.getLogger(DebeziumMsSqlSourceTester.class);
    private static final String NAME = "debezium-mssql";
    private final String pulsarServiceUrl;
    private DebeziumMsSqlContainer debeziumMsSqlContainer;
    private final PulsarCluster pulsarCluster;

    public DebeziumMsSqlSourceTester(PulsarCluster cluster) {
        super(NAME);
        this.pulsarCluster = cluster;
        this.numEntriesToInsert = 1;
        this.numEntriesExpectAfterStart = 0;
        this.pulsarServiceUrl = "pulsar://pulsar-proxy:6650";
        this.sourceConfig.put("database.hostname", NAME);
        this.sourceConfig.put("database.port", "1433");
        this.sourceConfig.put("database.user", "sa");
        this.sourceConfig.put("database.password", "p@ssw0rD");
        this.sourceConfig.put("database.server.name", "mssql");
        this.sourceConfig.put("database.dbname", "TestDB");
        this.sourceConfig.put("snapshot.mode", "schema_only");
        this.sourceConfig.put("database.history.pulsar.service.url", this.pulsarServiceUrl);
        this.sourceConfig.put("topic.namespace", "debezium/mssql");
    }

    @Override
    public void setServiceContainer(DebeziumMsSqlContainer container) {
        log.info("start debezium MS SQL server container.");
        Preconditions.checkState((this.debeziumMsSqlContainer == null ? 1 : 0) != 0);
        this.debeziumMsSqlContainer = container;
        this.pulsarCluster.startService(NAME, this.debeziumMsSqlContainer);
    }

    @Override
    public void prepareSource() {
        this.runSqlCmd("CREATE DATABASE TestDB;", false);
        this.runSqlCmd("EXEC sys.sp_cdc_enable_db;");
        ContainerExecResult res = this.runSqlCmd("SELECT is_cdc_enabled FROM sys.databases WHERE database_id = DB_ID();");
        Assert.assertTrue((boolean)res.getStdout().contains(" 1"));
        this.runSqlCmd("CREATE TABLE customers (id INT NOT NULL  IDENTITY  PRIMARY KEY, first_name VARCHAR(255) NOT NULL, last_name VARCHAR(255) NOT NULL, email VARCHAR(255) NOT NULL);");
        this.runSqlCmd("EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers', @role_name = NULL, @supports_net_changes = 0, @capture_instance = 'dbo_customers_v2';");
        this.runSqlCmd("EXEC sys.sp_cdc_start_job;");
    }

    private ContainerExecResult runSqlCmd(String cmd) throws Exception {
        return this.runSqlCmd(cmd, true);
    }

    private ContainerExecResult runSqlCmd(String cmd, boolean useTestDb) throws Exception {
        log.info("Executing \"{}\"", (Object)cmd);
        ContainerExecResult response = this.debeziumMsSqlContainer.execCmd("/bin/bash", "-c", "/opt/mssql-tools/bin/sqlcmd -S localhost -U sa -P \"p@ssw0rD\" -Q \"" + (useTestDb ? "USE TestDB; " : "") + cmd + "\"");
        if (Strings.isNullOrEmpty((String)response.getStderr())) {
            log.info("Result of \"{}\":\n{}", (Object)cmd, (Object)response.getStdout());
        } else {
            log.warn("Result of \"{}\":\n{}\n{}", new Object[]{cmd, response.getStdout(), response.getStderr()});
        }
        return response;
    }

    @Override
    public void prepareInsertEvent() throws Exception {
        this.runSqlCmd("INSERT INTO customers (first_name, last_name, email) VALUES ('John', 'Doe', 'jdoe@null.dev');");
        this.runSqlCmd("SELECT * FROM customers WHERE last_name='Doe';");
    }

    @Override
    public void prepareDeleteEvent() throws Exception {
        this.runSqlCmd("DELETE FROM customers WHERE last_name='Doe';");
        this.runSqlCmd("SELECT * FROM customers WHERE last_name='Doe';");
    }

    @Override
    public void prepareUpdateEvent() throws Exception {
        this.runSqlCmd("UPDATE customers SET first_name='Jack' WHERE last_name='Doe';");
        this.runSqlCmd("SELECT * FROM customers WHERE last_name='Doe';");
    }

    @Override
    public Map<String, String> produceSourceMessages(int numMessages) {
        log.info("debezium MS SQL server already contains preconfigured data.");
        return null;
    }

    @Override
    public int initialDelayForMsgReceive() {
        return 30;
    }

    @Override
    public String keyContains() {
        return "mssql.dbo.customers.Key";
    }

    @Override
    public String valueContains() {
        return "mssql.dbo.customers.Value";
    }

    @Override
    public void close() {
        if (this.pulsarCluster != null && this.debeziumMsSqlContainer != null) {
            PulsarCluster.stopService(NAME, this.debeziumMsSqlContainer);
            this.debeziumMsSqlContainer = null;
        }
    }

    public DebeziumMsSqlContainer getDebeziumMsSqlContainer() {
        return this.debeziumMsSqlContainer;
    }
}

