package io.confluent.connect.jdbc.sink;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.SqliteDatabaseDialect;
import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.SqliteHelper;
import io.confluent.connect.jdbc.util.TableDefinition;
import io.confluent.connect.jdbc.util.TableId;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import junit.framework.TestCase;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/jdbc/sink/JdbcDbWriterTest.class */
public class JdbcDbWriterTest {
    private final SqliteHelper sqliteHelper = new SqliteHelper(getClass().getSimpleName());
    private JdbcDbWriter writer = null;
    private DatabaseDialect dialect;

    @Before
    public void setUp() throws IOException, SQLException {
        this.sqliteHelper.setUp();
    }

    @After
    public void tearDown() throws IOException, SQLException {
        if (this.writer != null) {
            this.writer.closeQuietly();
        }
        this.sqliteHelper.tearDown();
    }

    private JdbcDbWriter newWriter(Map<String, String> map) {
        JdbcSinkConfig jdbcSinkConfig = new JdbcSinkConfig(map);
        this.dialect = new SqliteDatabaseDialect(jdbcSinkConfig);
        return new JdbcDbWriter(jdbcSinkConfig, this.dialect, new DbStructure(this.dialect));
    }

    @Test
    public void autoCreateWithAutoEvolve() throws SQLException {
        TableId tableId = new TableId((String) null, (String) null, "books");
        HashMap hashMap = new HashMap();
        hashMap.put("connection.url", this.sqliteHelper.sqliteUri());
        hashMap.put("auto.create", "true");
        hashMap.put("auto.evolve", "true");
        hashMap.put("pk.mode", "record_key");
        hashMap.put("pk.fields", "id");
        this.writer = newWriter(hashMap);
        Schema schema = Schema.INT64_SCHEMA;
        Schema build = SchemaBuilder.struct().field("author", Schema.STRING_SCHEMA).field("title", Schema.STRING_SCHEMA).build();
        this.writer.write(Collections.singleton(new SinkRecord("books", 0, schema, 1L, build, new Struct(build).put("author", "Tom Robbins").put("title", "Villa Incognito"), 0L)));
        TableDefinition describeTable = this.dialect.describeTable(this.writer.cachedConnectionProvider.getConnection(), tableId);
        TestCase.assertTrue(describeTable.definitionForColumn("id").isPrimaryKey());
        Iterator it = build.fields().iterator();
        while (it.hasNext()) {
            Assert.assertNotNull(describeTable.definitionForColumn(((Field) it.next()).name()));
        }
        SchemaBuilder field = SchemaBuilder.struct().field("author", Schema.STRING_SCHEMA).field("title", Schema.STRING_SCHEMA).field("year", Schema.OPTIONAL_INT32_SCHEMA).field("review", SchemaBuilder.string().defaultValue("").build());
        this.writer.write(Collections.singleton(new SinkRecord("books", 0, schema, 2L, field, new Struct(field).put("author", "Tom Robbins").put("title", "Fierce Invalids").put("year", 2016), 0L)));
        TableDefinition describeTable2 = this.dialect.describeTable(this.sqliteHelper.connection, tableId);
        TestCase.assertTrue(describeTable2.definitionForColumn("id").isPrimaryKey());
        Iterator it2 = field.fields().iterator();
        while (it2.hasNext()) {
            Assert.assertNotNull(describeTable2.definitionForColumn(((Field) it2.next()).name()));
        }
    }

    @Test(expected = SQLException.class)
    public void multiInsertWithKafkaPkFailsDueToUniqueConstraint() throws SQLException {
        writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.INSERT, JdbcSinkConfig.PrimaryKeyMode.KAFKA, "");
    }

    @Test
    public void idempotentUpsertWithKafkaPk() throws SQLException {
        writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.UPSERT, JdbcSinkConfig.PrimaryKeyMode.KAFKA, "");
    }

    @Test(expected = SQLException.class)
    public void multiInsertWithRecordKeyPkFailsDueToUniqueConstraint() throws SQLException {
        writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.INSERT, JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY, "");
    }

    @Test
    public void idempotentUpsertWithRecordKeyPk() throws SQLException {
        writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.UPSERT, JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY, "");
    }

    @Test(expected = SQLException.class)
    public void multiInsertWithRecordValuePkFailsDueToUniqueConstraint() throws SQLException {
        writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.INSERT, JdbcSinkConfig.PrimaryKeyMode.RECORD_VALUE, "author,title");
    }

    @Test
    public void idempotentUpsertWithRecordValuePk() throws SQLException {
        writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.UPSERT, JdbcSinkConfig.PrimaryKeyMode.RECORD_VALUE, "author,title");
    }

    private void writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode insertMode, JdbcSinkConfig.PrimaryKeyMode primaryKeyMode, String str) throws SQLException {
        HashMap hashMap = new HashMap();
        hashMap.put("connection.url", this.sqliteHelper.sqliteUri());
        hashMap.put("auto.create", "true");
        hashMap.put("pk.mode", primaryKeyMode.toString());
        hashMap.put("pk.fields", str);
        hashMap.put("insert.mode", insertMode.toString());
        this.writer = newWriter(hashMap);
        SchemaBuilder field = SchemaBuilder.struct().field("id", SchemaBuilder.INT64_SCHEMA);
        Struct put = new Struct(field).put("id", 0L);
        Schema build = SchemaBuilder.struct().field("author", Schema.STRING_SCHEMA).field("title", Schema.STRING_SCHEMA).build();
        this.writer.write(Collections.nCopies(2, new SinkRecord("books", 7, field, put, build, new Struct(build).put("author", "Tom Robbins").put("title", "Villa Incognito"), 42L)));
        Assert.assertEquals(1L, this.sqliteHelper.select("select count(*) from books", new SqliteHelper.ResultSetReadCallback() { // from class: io.confluent.connect.jdbc.sink.JdbcDbWriterTest.1
            @Override // io.confluent.connect.jdbc.sink.SqliteHelper.ResultSetReadCallback
            public void read(ResultSet resultSet) throws SQLException {
                Assert.assertEquals(1L, resultSet.getInt(1));
            }
        }));
    }

    @Test
    public void idempotentDeletes() throws SQLException {
        HashMap hashMap = new HashMap();
        hashMap.put("connection.url", this.sqliteHelper.sqliteUri());
        hashMap.put("auto.create", "true");
        hashMap.put("delete.enabled", "true");
        hashMap.put("pk.mode", "record_key");
        hashMap.put("insert.mode", "upsert");
        this.writer = newWriter(hashMap);
        SchemaBuilder field = SchemaBuilder.struct().field("id", SchemaBuilder.INT64_SCHEMA);
        Struct put = new Struct(field).put("id", 0L);
        Schema build = SchemaBuilder.struct().field("author", Schema.STRING_SCHEMA).field("title", Schema.STRING_SCHEMA).build();
        this.writer.write(Collections.nCopies(2, new SinkRecord("books", 7, field, put, build, new Struct(build).put("author", "Tom Robbins").put("title", "Villa Incognito"), 42L)));
        this.writer.write(Collections.nCopies(2, new SinkRecord("books", 7, field, put, (Schema) null, (Object) null, 42L)));
        Assert.assertEquals(1L, this.sqliteHelper.select("select count(*) from books", new SqliteHelper.ResultSetReadCallback() { // from class: io.confluent.connect.jdbc.sink.JdbcDbWriterTest.2
            @Override // io.confluent.connect.jdbc.sink.SqliteHelper.ResultSetReadCallback
            public void read(ResultSet resultSet) throws SQLException {
                Assert.assertEquals(0L, resultSet.getInt(1));
            }
        }));
    }

    @Test
    public void insertDeleteSameRecord() throws SQLException {
        HashMap hashMap = new HashMap();
        hashMap.put("connection.url", this.sqliteHelper.sqliteUri());
        hashMap.put("auto.create", "true");
        hashMap.put("delete.enabled", "true");
        hashMap.put("pk.mode", "record_key");
        hashMap.put("insert.mode", "upsert");
        this.writer = newWriter(hashMap);
        SchemaBuilder field = SchemaBuilder.struct().field("id", SchemaBuilder.INT64_SCHEMA);
        Struct put = new Struct(field).put("id", 0L);
        Schema build = SchemaBuilder.struct().field("author", Schema.STRING_SCHEMA).field("title", Schema.STRING_SCHEMA).build();
        SinkRecord sinkRecord = new SinkRecord("books", 7, field, put, build, new Struct(build).put("author", "Tom Robbins").put("title", "Villa Incognito"), 42L);
        SinkRecord sinkRecord2 = new SinkRecord("books", 7, field, put, (Schema) null, (Object) null, 42L);
        this.writer.write(Collections.singletonList(sinkRecord));
        this.writer.write(Collections.singletonList(sinkRecord2));
        Assert.assertEquals(1L, this.sqliteHelper.select("select count(*) from books", new SqliteHelper.ResultSetReadCallback() { // from class: io.confluent.connect.jdbc.sink.JdbcDbWriterTest.3
            @Override // io.confluent.connect.jdbc.sink.SqliteHelper.ResultSetReadCallback
            public void read(ResultSet resultSet) throws SQLException {
                Assert.assertEquals(0L, resultSet.getInt(1));
            }
        }));
    }

    @Test
    public void insertDeleteInsertSameRecord() throws SQLException {
        HashMap hashMap = new HashMap();
        hashMap.put("connection.url", this.sqliteHelper.sqliteUri());
        hashMap.put("auto.create", "true");
        hashMap.put("delete.enabled", "true");
        hashMap.put("pk.mode", "record_key");
        hashMap.put("insert.mode", "upsert");
        this.writer = newWriter(hashMap);
        SchemaBuilder field = SchemaBuilder.struct().field("id", SchemaBuilder.INT64_SCHEMA);
        Struct put = new Struct(field).put("id", 0L);
        Schema build = SchemaBuilder.struct().field("author", Schema.STRING_SCHEMA).field("title", Schema.STRING_SCHEMA).build();
        SinkRecord sinkRecord = new SinkRecord("books", 7, field, put, build, new Struct(build).put("author", "Tom Robbins").put("title", "Villa Incognito"), 42L);
        SinkRecord sinkRecord2 = new SinkRecord("books", 7, field, put, (Schema) null, (Object) null, 42L);
        this.writer.write(Collections.singletonList(sinkRecord));
        this.writer.write(Collections.singletonList(sinkRecord2));
        this.writer.write(Collections.singletonList(sinkRecord));
        Assert.assertEquals(1L, this.sqliteHelper.select("select count(*) from books", new SqliteHelper.ResultSetReadCallback() { // from class: io.confluent.connect.jdbc.sink.JdbcDbWriterTest.4
            @Override // io.confluent.connect.jdbc.sink.SqliteHelper.ResultSetReadCallback
            public void read(ResultSet resultSet) throws SQLException {
                Assert.assertEquals(1L, resultSet.getInt(1));
            }
        }));
    }

    @Test
    public void sameRecordNTimes() throws SQLException {
        this.sqliteHelper.deleteTable("sameRecordNTimes");
        this.sqliteHelper.createTable("CREATE TABLE sameRecordNTimes (    the_byte  INTEGER,    the_short INTEGER,    the_int INTEGER,    the_long INTEGER,    the_float REAL,    the_double REAL,    the_bool  INTEGER,    the_string TEXT,    the_bytes BLOB,     the_decimal  NUMERIC,    the_date  NUMERIC,    the_time  NUMERIC,    the_timestamp  NUMERIC);");
        SchemaBuilder field = SchemaBuilder.struct().name("sameRecordNTimes").field("the_byte", Schema.INT8_SCHEMA).field("the_short", Schema.INT16_SCHEMA).field("the_int", Schema.INT32_SCHEMA).field("the_long", Schema.INT64_SCHEMA).field("the_float", Schema.FLOAT32_SCHEMA).field("the_double", Schema.FLOAT64_SCHEMA).field("the_bool", Schema.BOOLEAN_SCHEMA).field("the_string", Schema.STRING_SCHEMA).field("the_bytes", Schema.BYTES_SCHEMA).field("the_decimal", Decimal.schema(2).schema()).field("the_date", Date.SCHEMA).field("the_time", Time.SCHEMA).field("the_timestamp", Timestamp.SCHEMA);
        java.util.Date date = new java.util.Date(1474661402123L);
        final Struct put = new Struct(field).put("the_byte", (byte) -32).put("the_short", (short) 1234).put("the_int", 42).put("the_long", 12425436L).put("the_float", Float.valueOf(2356.3f)).put("the_double", Double.valueOf(-2436546.56457d)).put("the_bool", true).put("the_string", "foo").put("the_bytes", new byte[]{-32, 124}).put("the_decimal", new BigDecimal("1234.567")).put("the_date", date).put("the_time", date).put("the_timestamp", date);
        int nextInt = ThreadLocalRandom.current().nextInt(20, 80);
        HashMap hashMap = new HashMap();
        hashMap.put("connection.url", this.sqliteHelper.sqliteUri());
        hashMap.put("table.name.format", "sameRecordNTimes");
        hashMap.put("batch.size", String.valueOf(ThreadLocalRandom.current().nextInt(20, 100)));
        this.writer = newWriter(hashMap);
        this.writer.write(Collections.nCopies(nextInt, new SinkRecord("topic", 0, (Schema) null, (Object) null, field, put, 0L)));
        Assert.assertEquals(nextInt, this.sqliteHelper.select("SELECT * FROM sameRecordNTimes", new SqliteHelper.ResultSetReadCallback() { // from class: io.confluent.connect.jdbc.sink.JdbcDbWriterTest.5
            @Override // io.confluent.connect.jdbc.sink.SqliteHelper.ResultSetReadCallback
            public void read(ResultSet resultSet) throws SQLException {
                Assert.assertEquals(put.getInt8("the_byte").byteValue(), resultSet.getByte("the_byte"));
                Assert.assertEquals(put.getInt16("the_short").shortValue(), resultSet.getShort("the_short"));
                Assert.assertEquals(put.getInt32("the_int").intValue(), resultSet.getInt("the_int"));
                Assert.assertEquals(put.getInt64("the_long").longValue(), resultSet.getLong("the_long"));
                Assert.assertEquals(put.getFloat32("the_float").floatValue(), resultSet.getFloat("the_float"), 0.01d);
                Assert.assertEquals(put.getFloat64("the_double").doubleValue(), resultSet.getDouble("the_double"), 0.01d);
                Assert.assertEquals(put.getBoolean("the_bool"), Boolean.valueOf(resultSet.getBoolean("the_bool")));
                Assert.assertEquals(put.getString("the_string"), resultSet.getString("the_string"));
                Assert.assertArrayEquals(put.getBytes("the_bytes"), resultSet.getBytes("the_bytes"));
                Assert.assertEquals(put.get("the_decimal"), resultSet.getBigDecimal("the_decimal"));
                Assert.assertEquals(new java.sql.Date(((java.util.Date) put.get("the_date")).getTime()), resultSet.getDate("the_date"));
                Assert.assertEquals(new java.sql.Time(((java.util.Date) put.get("the_time")).getTime()), resultSet.getTime("the_time"));
                Assert.assertEquals(new java.sql.Timestamp(((java.util.Date) put.get("the_time")).getTime()), resultSet.getTimestamp("the_timestamp"));
            }
        }));
    }
}
