/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.sink;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.SqliteDatabaseDialect;
import io.confluent.connect.jdbc.sink.DbStructure;
import io.confluent.connect.jdbc.sink.JdbcDbWriter;
import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.SqliteHelper;
import io.confluent.connect.jdbc.util.TableDefinition;
import io.confluent.connect.jdbc.util.TableId;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import junit.framework.TestCase;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class JdbcDbWriterTest {
    private final SqliteHelper sqliteHelper = new SqliteHelper(this.getClass().getSimpleName());
    private JdbcDbWriter writer = null;
    private DatabaseDialect dialect;

    @Before
    public void setUp() throws IOException, SQLException {
        this.sqliteHelper.setUp();
    }

    @After
    public void tearDown() throws IOException, SQLException {
        if (this.writer != null) {
            this.writer.closeQuietly();
        }
        this.sqliteHelper.tearDown();
    }

    private JdbcDbWriter newWriter(Map<String, String> props) {
        JdbcSinkConfig config = new JdbcSinkConfig(props);
        this.dialect = new SqliteDatabaseDialect((AbstractConfig)config);
        DbStructure dbStructure = new DbStructure(this.dialect);
        return new JdbcDbWriter(config, this.dialect, dbStructure);
    }

    @Test
    public void autoCreateWithAutoEvolve() throws SQLException {
        String topic = "books";
        TableId tableId = new TableId(null, null, topic);
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("connection.url", this.sqliteHelper.sqliteUri());
        props.put("auto.create", "true");
        props.put("auto.evolve", "true");
        props.put("pk.mode", "record_key");
        props.put("pk.fields", "id");
        this.writer = this.newWriter(props);
        Schema keySchema = Schema.INT64_SCHEMA;
        Schema valueSchema1 = SchemaBuilder.struct().field("author", Schema.STRING_SCHEMA).field("title", Schema.STRING_SCHEMA).build();
        Struct valueStruct1 = new Struct(valueSchema1).put("author", (Object)"Tom Robbins").put("title", (Object)"Villa Incognito");
        this.writer.write(Collections.singleton(new SinkRecord(topic, 0, keySchema, (Object)1L, valueSchema1, (Object)valueStruct1, 0L)));
        TableDefinition metadata = this.dialect.describeTable(this.writer.cachedConnectionProvider.getConnection(), tableId);
        TestCase.assertTrue((boolean)metadata.definitionForColumn("id").isPrimaryKey());
        for (Field field : valueSchema1.fields()) {
            Assert.assertNotNull((Object)metadata.definitionForColumn(field.name()));
        }
        SchemaBuilder valueSchema2 = SchemaBuilder.struct().field("author", Schema.STRING_SCHEMA).field("title", Schema.STRING_SCHEMA).field("year", Schema.OPTIONAL_INT32_SCHEMA).field("review", SchemaBuilder.string().defaultValue((Object)"").build());
        Struct valueStruct2 = new Struct((Schema)valueSchema2).put("author", (Object)"Tom Robbins").put("title", (Object)"Fierce Invalids").put("year", (Object)2016);
        this.writer.write(Collections.singleton(new SinkRecord(topic, 0, keySchema, (Object)2L, (Schema)valueSchema2, (Object)valueStruct2, 0L)));
        TableDefinition refreshedMetadata = this.dialect.describeTable(this.sqliteHelper.connection, tableId);
        TestCase.assertTrue((boolean)refreshedMetadata.definitionForColumn("id").isPrimaryKey());
        for (Field field : valueSchema2.fields()) {
            Assert.assertNotNull((Object)refreshedMetadata.definitionForColumn(field.name()));
        }
    }

    @Test(expected=SQLException.class)
    public void multiInsertWithKafkaPkFailsDueToUniqueConstraint() throws SQLException {
        this.writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.INSERT, JdbcSinkConfig.PrimaryKeyMode.KAFKA, "");
    }

    @Test
    public void idempotentUpsertWithKafkaPk() throws SQLException {
        this.writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.UPSERT, JdbcSinkConfig.PrimaryKeyMode.KAFKA, "");
    }

    @Test(expected=SQLException.class)
    public void multiInsertWithRecordKeyPkFailsDueToUniqueConstraint() throws SQLException {
        this.writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.INSERT, JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY, "");
    }

    @Test
    public void idempotentUpsertWithRecordKeyPk() throws SQLException {
        this.writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.UPSERT, JdbcSinkConfig.PrimaryKeyMode.RECORD_KEY, "");
    }

    @Test(expected=SQLException.class)
    public void multiInsertWithRecordValuePkFailsDueToUniqueConstraint() throws SQLException {
        this.writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.INSERT, JdbcSinkConfig.PrimaryKeyMode.RECORD_VALUE, "author,title");
    }

    @Test
    public void idempotentUpsertWithRecordValuePk() throws SQLException {
        this.writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode.UPSERT, JdbcSinkConfig.PrimaryKeyMode.RECORD_VALUE, "author,title");
    }

    private void writeSameRecordTwiceExpectingSingleUpdate(JdbcSinkConfig.InsertMode insertMode, JdbcSinkConfig.PrimaryKeyMode pkMode, String pkFields) throws SQLException {
        String topic = "books";
        int partition = 7;
        long offset = 42L;
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("connection.url", this.sqliteHelper.sqliteUri());
        props.put("auto.create", "true");
        props.put("pk.mode", pkMode.toString());
        props.put("pk.fields", pkFields);
        props.put("insert.mode", insertMode.toString());
        this.writer = this.newWriter(props);
        SchemaBuilder keySchema = SchemaBuilder.struct().field("id", SchemaBuilder.INT64_SCHEMA);
        Struct keyStruct = new Struct((Schema)keySchema).put("id", (Object)0L);
        Schema valueSchema = SchemaBuilder.struct().field("author", Schema.STRING_SCHEMA).field("title", Schema.STRING_SCHEMA).build();
        Struct valueStruct = new Struct(valueSchema).put("author", (Object)"Tom Robbins").put("title", (Object)"Villa Incognito");
        SinkRecord record = new SinkRecord(topic, partition, (Schema)keySchema, (Object)keyStruct, valueSchema, (Object)valueStruct, offset);
        this.writer.write(Collections.nCopies(2, record));
        Assert.assertEquals((long)1L, (long)this.sqliteHelper.select("select count(*) from books", new SqliteHelper.ResultSetReadCallback(){

            @Override
            public void read(ResultSet rs) throws SQLException {
                Assert.assertEquals((long)1L, (long)rs.getInt(1));
            }
        }));
    }

    @Test
    public void sameRecordNTimes() throws SQLException {
        String testId = "sameRecordNTimes";
        String createTable = "CREATE TABLE " + testId + " (    the_byte  INTEGER,    the_short INTEGER,    the_int INTEGER,    the_long INTEGER,    the_float REAL,    the_double REAL,    the_bool  INTEGER,    the_string TEXT,    the_bytes BLOB,     the_decimal  NUMERIC,    the_date  NUMERIC,    the_time  NUMERIC,    the_timestamp  NUMERIC);";
        this.sqliteHelper.deleteTable(testId);
        this.sqliteHelper.createTable(createTable);
        SchemaBuilder schema = SchemaBuilder.struct().name(testId).field("the_byte", Schema.INT8_SCHEMA).field("the_short", Schema.INT16_SCHEMA).field("the_int", Schema.INT32_SCHEMA).field("the_long", Schema.INT64_SCHEMA).field("the_float", Schema.FLOAT32_SCHEMA).field("the_double", Schema.FLOAT64_SCHEMA).field("the_bool", Schema.BOOLEAN_SCHEMA).field("the_string", Schema.STRING_SCHEMA).field("the_bytes", Schema.BYTES_SCHEMA).field("the_decimal", Decimal.schema((int)2).schema()).field("the_date", org.apache.kafka.connect.data.Date.SCHEMA).field("the_time", Time.SCHEMA).field("the_timestamp", org.apache.kafka.connect.data.Timestamp.SCHEMA);
        java.util.Date instant = new java.util.Date(1474661402123L);
        final Struct struct = new Struct((Schema)schema).put("the_byte", (Object)-32).put("the_short", (Object)1234).put("the_int", (Object)42).put("the_long", (Object)12425436L).put("the_float", (Object)Float.valueOf(2356.3f)).put("the_double", (Object)-2436546.56457).put("the_bool", (Object)true).put("the_string", (Object)"foo").put("the_bytes", (Object)new byte[]{-32, 124}).put("the_decimal", (Object)new BigDecimal("1234.567")).put("the_date", (Object)instant).put("the_time", (Object)instant).put("the_timestamp", (Object)instant);
        int numRecords = ThreadLocalRandom.current().nextInt(20, 80);
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("connection.url", this.sqliteHelper.sqliteUri());
        props.put("table.name.format", testId);
        props.put("batch.size", String.valueOf(ThreadLocalRandom.current().nextInt(20, 100)));
        this.writer = this.newWriter(props);
        this.writer.write(Collections.nCopies(numRecords, new SinkRecord("topic", 0, null, null, (Schema)schema, (Object)struct, 0L)));
        Assert.assertEquals((long)numRecords, (long)this.sqliteHelper.select("SELECT * FROM " + testId, new SqliteHelper.ResultSetReadCallback(){

            @Override
            public void read(ResultSet rs) throws SQLException {
                Assert.assertEquals((long)struct.getInt8("the_byte").byteValue(), (long)rs.getByte("the_byte"));
                Assert.assertEquals((long)struct.getInt16("the_short").shortValue(), (long)rs.getShort("the_short"));
                Assert.assertEquals((long)struct.getInt32("the_int").intValue(), (long)rs.getInt("the_int"));
                Assert.assertEquals((long)struct.getInt64("the_long"), (long)rs.getLong("the_long"));
                Assert.assertEquals((double)struct.getFloat32("the_float").floatValue(), (double)rs.getFloat("the_float"), (double)0.01);
                Assert.assertEquals((double)struct.getFloat64("the_double"), (double)rs.getDouble("the_double"), (double)0.01);
                Assert.assertEquals((Object)struct.getBoolean("the_bool"), (Object)rs.getBoolean("the_bool"));
                Assert.assertEquals((Object)struct.getString("the_string"), (Object)rs.getString("the_string"));
                Assert.assertArrayEquals((byte[])struct.getBytes("the_bytes"), (byte[])rs.getBytes("the_bytes"));
                Assert.assertEquals((Object)struct.get("the_decimal"), (Object)rs.getBigDecimal("the_decimal"));
                Assert.assertEquals((Object)new Date(((java.util.Date)struct.get("the_date")).getTime()), (Object)rs.getDate("the_date"));
                Assert.assertEquals((Object)new java.sql.Time(((java.util.Date)struct.get("the_time")).getTime()), (Object)rs.getTime("the_time"));
                Assert.assertEquals((Object)new Timestamp(((java.util.Date)struct.get("the_time")).getTime()), (Object)rs.getTimestamp("the_timestamp"));
            }
        }));
    }
}

