package io.confluent.connect.jdbc.sink.integration;

import io.confluent.connect.jdbc.integration.BaseConnectorIT;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.storage.StringConverter;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.FixedHostPortGenericContainer;

/* loaded from: input_file:io/confluent/connect/jdbc/sink/integration/MicrosoftSqlServerSinkIT.class */
public class MicrosoftSqlServerSinkIT extends BaseConnectorIT {
    private static final String CONNECTOR_NAME = "jdbc-sink-connector";
    private static final String MSSQL_URL = "jdbc:sqlserver://0.0.0.0:1433";
    private Map<String, String> props;
    private Connection connection;
    private JsonConverter jsonConverter;
    private static final String USER = "sa";
    private static final Logger log = LoggerFactory.getLogger(MicrosoftSqlServerSinkIT.class);
    private static final String PASS = "reallyStrongPwd123";

    @ClassRule
    public static final FixedHostPortGenericContainer mssqlServer = new FixedHostPortGenericContainer("mcr.microsoft.com/mssql/server:2019-latest").withEnv("ACCEPT_EULA", "Y").withEnv("SA_PASSWORD", PASS).withFixedExposedPort(1433, 1433);

    @Before
    public void setup() throws Exception {
        Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
        this.connection = DriverManager.getConnection(MSSQL_URL, USER, PASS);
        startConnect();
        this.jsonConverter = jsonConverter();
    }

    @After
    public void close() throws SQLException {
        this.connect.deleteConnector(CONNECTOR_NAME);
        this.connection.close();
        stopConnect();
    }

    @Test
    public void verifyConnectorFailsWhenTableNameS() throws Exception {
        this.props = configProperties("example_table");
        executeSQL(this.connection.prepareStatement("CREATE TABLE guest.example_table (id int NULL, last_name VARCHAR(50), created_at DATETIME2 NOT NULL);"));
        this.connect.kafka().createTopic("example_table", 1);
        configureAndWaitForConnector(1);
        Timestamp from = Timestamp.from(ZonedDateTime.of(2017, 12, 8, 19, 34, 56, 0, ZoneId.of("UTC")).toInstant());
        Schema build = SchemaBuilder.struct().name("com.example.Person").field("id", Schema.INT32_SCHEMA).field("last_name", Schema.STRING_SCHEMA).field("created_at", org.apache.kafka.connect.data.Timestamp.SCHEMA).build();
        this.connect.kafka().produce("example_table", (String) null, new String(this.jsonConverter.fromConnectData("example_table", build, new Struct(build).put("id", 1).put("last_name", "Brams").put("created_at", from))));
        assertTasksFailedWithTrace(CONNECTOR_NAME, 1, "Table \"dbo\".\"example_table\" is missing and auto-creation is disabled");
    }

    @Test
    public void verifyNullBYTESValue() throws Exception {
        this.props = configProperties("optional_bytes");
        this.props.put("auto.create", "true");
        this.connect.kafka().createTopic("optional_bytes", 1);
        configureAndWaitForConnector(1);
        Schema build = SchemaBuilder.struct().name("com.example.OptionalBytes").field("id", Schema.INT32_SCHEMA).field("optional_bytes", Schema.OPTIONAL_BYTES_SCHEMA).build();
        this.connect.kafka().produce("optional_bytes", (String) null, new String(this.jsonConverter.fromConnectData("optional_bytes", build, new Struct(build).put("id", 1).put("optional_bytes", (Object) null))));
        waitForCommittedRecords(CONNECTOR_NAME, Collections.singleton("optional_bytes"), 1L, 1, TimeUnit.MINUTES.toMillis(2L));
    }

    @Test
    public void verifyUnicodeWorksWhenSendStringParametersAsUnicodeEqualsFalse() throws Exception {
        this.props = configProperties("table_with_unicode_fields");
        executeSQL(this.connection.prepareStatement("CREATE TABLE dbo.table_with_unicode_fields (id int, unicode_field NVARCHAR(50));"));
        this.props.put("connection.url", "jdbc:sqlserver://0.0.0.0:1433;sendStringParametersAsUnicode=false");
        this.connect.kafka().createTopic("table_with_unicode_fields", 1);
        configureAndWaitForConnector(1);
        Schema build = SchemaBuilder.struct().name("com.example.UnicodeField").field("id", Schema.INT32_SCHEMA).field("unicode_field", Schema.OPTIONAL_STRING_SCHEMA).build();
        Struct put = new Struct(build).put("id", 1).put("unicode_field", "एम एस सीक्वल सर्वर");
        this.connect.kafka().produce("table_with_unicode_fields", (String) null, new String(this.jsonConverter.fromConnectData("table_with_unicode_fields", build, put)));
        waitForCommittedRecords(CONNECTOR_NAME, Collections.singleton("table_with_unicode_fields"), 1L, 1, TimeUnit.MINUTES.toMillis(2L));
        Statement createStatement = this.connection.createStatement();
        Throwable th = null;
        try {
            ResultSet executeQuery = createStatement.executeQuery("SELECT * FROM dbo.table_with_unicode_fields");
            Throwable th2 = null;
            try {
                try {
                    Assert.assertTrue(executeQuery.next());
                    Assert.assertEquals(put.getInt32("id").intValue(), executeQuery.getInt("id"));
                    Assert.assertEquals(put.getString("unicode_field"), executeQuery.getNString("unicode_field"));
                    if (executeQuery != null) {
                        if (0 != 0) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            executeQuery.close();
                        }
                    }
                    if (createStatement != null) {
                        if (0 == 0) {
                            createStatement.close();
                            return;
                        }
                        try {
                            createStatement.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (executeQuery != null) {
                    if (th2 != null) {
                        try {
                            executeQuery.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        executeQuery.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (createStatement != null) {
                if (0 != 0) {
                    try {
                        createStatement.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    createStatement.close();
                }
            }
            throw th8;
        }
    }

    private Map<String, String> configProperties(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("connector.class", "JdbcSinkConnector");
        hashMap.put("key.converter", StringConverter.class.getName());
        hashMap.put("value.converter", JsonConverter.class.getName());
        hashMap.put("confluent.topic.bootstrap.servers", this.connect.kafka().bootstrapServers());
        hashMap.put("confluent.topic.replication.factor", "1");
        hashMap.put("connection.url", MSSQL_URL);
        hashMap.put("connection.user", USER);
        hashMap.put("connection.password", PASS);
        hashMap.put("pk.mode", "none");
        hashMap.put("topics", str);
        return hashMap;
    }

    private void executeSQL(PreparedStatement preparedStatement) throws Exception {
        try {
            preparedStatement.executeUpdate();
        } catch (Exception e) {
            log.error("Could not execute SQL: " + preparedStatement.toString());
            throw e;
        }
    }

    private void configureAndWaitForConnector(int i) throws Exception {
        this.connect.configureConnector(CONNECTOR_NAME, this.props);
        waitForConnectorToStart(CONNECTOR_NAME, i);
    }
}
