/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.sink;

import io.confluent.connect.jdbc.sink.JdbcDbWriter;
import io.confluent.connect.jdbc.sink.JdbcSinkTask;
import io.confluent.connect.jdbc.sink.SqliteHelper;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Set;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class JdbcSinkTaskTest
extends EasyMockSupport {
    private final SqliteHelper sqliteHelper = new SqliteHelper(((Object)((Object)this)).getClass().getSimpleName());
    private static final Schema SCHEMA = SchemaBuilder.struct().name("com.example.Person").field("firstName", Schema.STRING_SCHEMA).field("lastName", Schema.STRING_SCHEMA).field("age", Schema.OPTIONAL_INT32_SCHEMA).field("bool", Schema.OPTIONAL_BOOLEAN_SCHEMA).field("short", Schema.OPTIONAL_INT16_SCHEMA).field("byte", Schema.OPTIONAL_INT8_SCHEMA).field("long", Schema.OPTIONAL_INT64_SCHEMA).field("float", Schema.OPTIONAL_FLOAT32_SCHEMA).field("double", Schema.OPTIONAL_FLOAT64_SCHEMA).build();

    @Before
    public void setUp() throws IOException, SQLException {
        this.sqliteHelper.setUp();
    }

    @After
    public void tearDown() throws IOException, SQLException {
        this.sqliteHelper.tearDown();
    }

    @Test
    public void putPropagatesToDbWithAutoCreateAndPkModeKafka() throws Exception {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("connection.url", this.sqliteHelper.sqliteUri());
        props.put("auto.create", "true");
        props.put("pk.mode", "kafka");
        props.put("pk.fields", "kafka_topic,kafka_partition,kafka_offset");
        JdbcSinkTask task = new JdbcSinkTask();
        task.initialize((SinkTaskContext)this.mock(SinkTaskContext.class));
        task.start(props);
        final Struct struct = new Struct(SCHEMA).put("firstName", (Object)"Alex").put("lastName", (Object)"Smith").put("bool", (Object)true).put("short", (Object)1234).put("byte", (Object)-32).put("long", (Object)12425436L).put("float", (Object)Float.valueOf(2356.3f)).put("double", (Object)-2436546.56457).put("age", (Object)21);
        String topic = "atopic";
        task.put(Collections.singleton(new SinkRecord("atopic", 1, null, null, SCHEMA, (Object)struct, 42L)));
        Assert.assertEquals((long)1L, (long)this.sqliteHelper.select("SELECT * FROM atopic", new SqliteHelper.ResultSetReadCallback(){

            @Override
            public void read(ResultSet rs) throws SQLException {
                Assert.assertEquals((Object)"atopic", (Object)rs.getString("kafka_topic"));
                Assert.assertEquals((long)1L, (long)rs.getInt("kafka_partition"));
                Assert.assertEquals((long)42L, (long)rs.getLong("kafka_offset"));
                Assert.assertEquals((Object)struct.getString("firstName"), (Object)rs.getString("firstName"));
                Assert.assertEquals((Object)struct.getString("lastName"), (Object)rs.getString("lastName"));
                Assert.assertEquals((Object)struct.getBoolean("bool"), (Object)rs.getBoolean("bool"));
                Assert.assertEquals((long)struct.getInt8("byte").byteValue(), (long)rs.getByte("byte"));
                Assert.assertEquals((long)struct.getInt16("short").shortValue(), (long)rs.getShort("short"));
                Assert.assertEquals((long)struct.getInt32("age").intValue(), (long)rs.getInt("age"));
                Assert.assertEquals((long)struct.getInt64("long"), (long)rs.getLong("long"));
                Assert.assertEquals((double)struct.getFloat32("float").floatValue(), (double)rs.getFloat("float"), (double)0.01);
                Assert.assertEquals((double)struct.getFloat64("double"), (double)rs.getDouble("double"), (double)0.01);
            }
        }));
    }

    @Test
    public void putPropagatesToDbWithPkModeRecordValue() throws Exception {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("connection.url", this.sqliteHelper.sqliteUri());
        props.put("pk.mode", "record_value");
        props.put("pk.fields", "firstName,lastName");
        JdbcSinkTask task = new JdbcSinkTask();
        task.initialize((SinkTaskContext)this.mock(SinkTaskContext.class));
        String topic = "atopic";
        this.sqliteHelper.createTable("CREATE TABLE atopic(    firstName  TEXT,    lastName  TEXT,    age INTEGER,    bool  NUMERIC,    byte  INTEGER,    short INTEGER NULL,    long INTEGER,    float NUMERIC,    double NUMERIC,    bytes BLOB, PRIMARY KEY (firstName, lastName));");
        task.start(props);
        final Struct struct = new Struct(SCHEMA).put("firstName", (Object)"Christina").put("lastName", (Object)"Brams").put("bool", (Object)false).put("byte", (Object)-72).put("long", (Object)8594L).put("double", (Object)3256677.56457).put("age", (Object)28);
        task.put(Collections.singleton(new SinkRecord("atopic", 1, null, null, SCHEMA, (Object)struct, 43L)));
        Assert.assertEquals((long)1L, (long)this.sqliteHelper.select("SELECT * FROM atopic WHERE firstName='" + struct.getString("firstName") + "' and lastName='" + struct.getString("lastName") + "'", new SqliteHelper.ResultSetReadCallback(){

            @Override
            public void read(ResultSet rs) throws SQLException {
                Assert.assertEquals((Object)struct.getBoolean("bool"), (Object)rs.getBoolean("bool"));
                rs.getShort("short");
                Assert.assertTrue((boolean)rs.wasNull());
                Assert.assertEquals((long)struct.getInt8("byte").byteValue(), (long)rs.getByte("byte"));
                Assert.assertEquals((long)struct.getInt32("age").intValue(), (long)rs.getInt("age"));
                Assert.assertEquals((long)struct.getInt64("long"), (long)rs.getLong("long"));
                rs.getShort("float");
                Assert.assertTrue((boolean)rs.wasNull());
                Assert.assertEquals((double)struct.getFloat64("double"), (double)rs.getDouble("double"), (double)0.01);
            }
        }));
    }

    @Test
    public void retries() throws SQLException {
        int maxRetries = 2;
        int retryBackoffMs = 1000;
        Set<SinkRecord> records = Collections.singleton(new SinkRecord("stub", 0, null, null, null, null, 0L));
        final JdbcDbWriter mockWriter = (JdbcDbWriter)this.createMock(JdbcDbWriter.class);
        SinkTaskContext ctx = (SinkTaskContext)this.createMock(SinkTaskContext.class);
        mockWriter.write(records);
        EasyMock.expectLastCall().andThrow((Throwable)new SQLException()).times(3);
        ctx.timeout(1000L);
        EasyMock.expectLastCall().times(2);
        mockWriter.closeQuietly();
        EasyMock.expectLastCall().times(2);
        JdbcSinkTask task = new JdbcSinkTask(){

            void initWriter() {
                this.writer = mockWriter;
            }
        };
        task.initialize(ctx);
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("connection.url", "stub");
        props.put("max.retries", String.valueOf(2));
        props.put("retry.backoff.ms", String.valueOf(1000));
        task.start(props);
        this.replayAll();
        try {
            task.put(records);
            Assert.fail();
        }
        catch (RetriableException retriableException) {
            // empty catch block
        }
        try {
            task.put(records);
            Assert.fail();
        }
        catch (RetriableException retriableException) {
            // empty catch block
        }
        try {
            task.put(records);
            Assert.fail();
        }
        catch (RetriableException e) {
            Assert.fail((String)"Non-retriable exception expected");
        }
        catch (ConnectException expected) {
            Assert.assertEquals(SQLException.class, expected.getCause().getClass());
        }
        this.verifyAll();
    }
}

