/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.source.EmbeddedDerby;
import io.confluent.connect.jdbc.source.JdbcSourceTask;
import io.confluent.connect.jdbc.source.JdbcSourceTaskTestBase;
import io.confluent.connect.jdbc.source.TimestampIncrementingOffset;
import io.confluent.connect.jdbc.util.DateTimeUtils;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(value=PowerMockRunner.class)
@PrepareForTest(value={JdbcSourceTask.class})
@PowerMockIgnore(value={"javax.management.*"})
public class JdbcSourceTaskUpdateTest
extends JdbcSourceTaskTestBase {
    private static final Map<String, String> QUERY_SOURCE_PARTITION = Collections.singletonMap("query", "query");

    @Override
    @After
    public void tearDown() throws Exception {
        this.task.stop();
        super.tearDown();
    }

    @Test
    public void testBulkPeriodicLoad() throws Exception {
        EmbeddedDerby.ColumnName column = new EmbeddedDerby.ColumnName("id");
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT NOT NULL");
        this.db.insert(SINGLE_TABLE_NAME, "id", 1);
        this.task.start(this.singleTableConfig());
        List records = this.task.poll();
        Assert.assertEquals(Collections.singletonMap(1, 1), this.countIntValues(records, "id"));
        this.assertRecordsTopic(records, "test-" + SINGLE_TABLE_NAME);
        records = this.task.poll();
        Assert.assertEquals(Collections.singletonMap(1, 1), this.countIntValues(records, "id"));
        this.assertRecordsTopic(records, "test-" + SINGLE_TABLE_NAME);
        this.db.insert(SINGLE_TABLE_NAME, "id", 2);
        records = this.task.poll();
        HashMap<Integer, Integer> twoRecords = new HashMap<Integer, Integer>();
        twoRecords.put(1, 1);
        twoRecords.put(2, 1);
        Assert.assertEquals(twoRecords, this.countIntValues(records, "id"));
        this.assertRecordsTopic(records, "test-" + SINGLE_TABLE_NAME);
        this.db.delete(SINGLE_TABLE_NAME, new EmbeddedDerby.EqualsCondition(column, 1));
        records = this.task.poll();
        Assert.assertEquals(Collections.singletonMap(2, 1), this.countIntValues(records, "id"));
        this.assertRecordsTopic(records, "test-" + SINGLE_TABLE_NAME);
    }

    @Test(expected=ConnectException.class)
    public void testIncrementingInvalidColumn() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT");
        this.startTask(null, "id", null);
        PowerMock.verifyAll();
    }

    @Test(expected=ConnectException.class)
    public void testTimestampInvalidColumn() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP");
        this.startTask("modified", null, null);
        PowerMock.verifyAll();
    }

    @Test
    public void testManualIncrementing() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT NOT NULL");
        this.db.insert(SINGLE_TABLE_NAME, "id", 1);
        this.startTask(null, "id", null);
        this.verifyIncrementingFirstPoll("test-" + SINGLE_TABLE_NAME);
        this.db.insert(SINGLE_TABLE_NAME, "id", 2);
        this.db.insert(SINGLE_TABLE_NAME, "id", 3);
        this.verifyPoll(2, "id", Arrays.asList(2, 3), false, true, false, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testAutoincrement() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        String extraColumn = "col";
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT NOT NULL GENERATED ALWAYS AS IDENTITY", extraColumn, "FLOAT");
        this.db.insert(SINGLE_TABLE_NAME, extraColumn, Float.valueOf(32.4f));
        this.startTask(null, "", null);
        this.verifyIncrementingFirstPoll("test-" + SINGLE_TABLE_NAME);
        this.db.insert(SINGLE_TABLE_NAME, extraColumn, Float.valueOf(33.4f));
        this.db.insert(SINGLE_TABLE_NAME, extraColumn, Float.valueOf(35.4f));
        this.verifyPoll(2, "id", Arrays.asList(2, 3), false, true, false, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testTimestamp() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP NOT NULL", "id", "INT");
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 1);
        this.startTask("modified", null, null);
        this.verifyTimestampFirstPoll("test-" + SINGLE_TABLE_NAME);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 2);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(11L)), "id", 3);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(12L)), "id", 4);
        this.verifyPoll(2, "id", Arrays.asList(3, 4), true, false, false, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testMultiColumnTimestamp() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP", "created", "TIMESTAMP NOT NULL", "id", "INT");
        this.db.insert(SINGLE_TABLE_NAME, "created", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 1);
        this.startTask("modified, created", null, null);
        this.verifyMultiTimestampFirstPoll("test-" + SINGLE_TABLE_NAME);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(13L)), "created", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 2);
        this.db.insert(SINGLE_TABLE_NAME, "created", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(11L)), "id", 3);
        this.db.insert(SINGLE_TABLE_NAME, "created", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(12L)), "id", 4);
        this.verifyPoll(3, "id", Arrays.asList(2, 3, 4), false, false, true, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testTimestampWithDelay() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP NOT NULL", "id", "INT");
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 1);
        this.startTask("modified", null, null, 4L);
        this.verifyTimestampFirstPoll("test-" + SINGLE_TABLE_NAME);
        Long currentTime = new Date().getTime();
        this.db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime).toString(), "id", 2);
        this.db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 1L).toString(), "id", 3);
        this.db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 500L).toString(), "id", 4);
        this.db.insert(SINGLE_TABLE_NAME, "modified", new Timestamp(currentTime + 501L).toString(), "id", 5);
        this.verifyPoll(2, "id", Arrays.asList(2, 3), true, false, false, "test-" + SINGLE_TABLE_NAME);
        Thread.sleep(500L);
        this.verifyPoll(2, "id", Arrays.asList(4, 5), true, false, false, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testTimestampAndIncrementing() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP NOT NULL", "id", "INT NOT NULL");
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 1);
        this.startTask("modified", "id", null);
        this.verifyIncrementingAndTimestampFirstPoll("test-" + SINGLE_TABLE_NAME);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 3);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(11L)), "id", 1);
        this.verifyPoll(2, "id", Arrays.asList(3, 1), true, true, false, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testMultiColumnTimestampAndIncrementing() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP", "created", "TIMESTAMP NOT NULL", "id", "INT NOT NULL");
        this.db.insert(SINGLE_TABLE_NAME, "created", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 1);
        this.startTask("modified, created", "id", null);
        this.verifyIncrementingAndMultiTimestampFirstPoll("test-" + SINGLE_TABLE_NAME);
        this.db.insert(SINGLE_TABLE_NAME, "created", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 3);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(11L)), "created", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 1);
        this.verifyPoll(2, "id", Arrays.asList(3, 1), false, true, true, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testManualIncrementingRestoreNoVersionOffset() throws Exception {
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(null, Long.valueOf(1L));
        this.testManualIncrementingRestoreOffset(Collections.singletonMap(SINGLE_TABLE_PARTITION, offset.toMap()));
    }

    @Test
    public void testManualIncrementingRestoreVersionOneOffset() throws Exception {
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(null, Long.valueOf(1L));
        this.testManualIncrementingRestoreOffset(Collections.singletonMap(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap()));
    }

    @Test
    public void testManualIncrementingRestoreOffsetsWithMultipleProtocol() throws Exception {
        TimestampIncrementingOffset oldOffset = new TimestampIncrementingOffset(null, Long.valueOf(0L));
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(null, Long.valueOf(1L));
        HashMap<Map<String, String>, Map<String, Object>> offsets = new HashMap<Map<String, String>, Map<String, Object>>();
        offsets.put(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap());
        offsets.put(SINGLE_TABLE_PARTITION, oldOffset.toMap());
        this.testManualIncrementingRestoreOffset(offsets);
    }

    private void testManualIncrementingRestoreOffset(Map<Map<String, String>, Map<String, Object>> offsets) throws Exception {
        this.expectInitialize(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION), offsets);
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT NOT NULL");
        this.db.insert(SINGLE_TABLE_NAME, "id", 1);
        this.db.insert(SINGLE_TABLE_NAME, "id", 2);
        this.db.insert(SINGLE_TABLE_NAME, "id", 3);
        this.startTask(null, "id", null);
        this.verifyPoll(2, "id", Arrays.asList(2, 3), false, true, false, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testAutoincrementRestoreNoVersionOffset() throws Exception {
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(null, Long.valueOf(1L));
        this.testAutoincrementRestoreOffset(Collections.singletonMap(SINGLE_TABLE_PARTITION, offset.toMap()));
    }

    @Test
    public void testAutoincrementRestoreVersionOneOffset() throws Exception {
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(null, Long.valueOf(1L));
        this.testAutoincrementRestoreOffset(Collections.singletonMap(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap()));
    }

    @Test
    public void testAutoincrementRestoreOffsetsWithMultipleProtocol() throws Exception {
        TimestampIncrementingOffset oldOffset = new TimestampIncrementingOffset(null, Long.valueOf(0L));
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(null, Long.valueOf(1L));
        HashMap<Map<String, String>, Map<String, Object>> offsets = new HashMap<Map<String, String>, Map<String, Object>>();
        offsets.put(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap());
        offsets.put(SINGLE_TABLE_PARTITION, oldOffset.toMap());
        this.testAutoincrementRestoreOffset(offsets);
    }

    private void testAutoincrementRestoreOffset(Map<Map<String, String>, Map<String, Object>> offsets) throws Exception {
        this.expectInitialize(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION), offsets);
        PowerMock.replayAll((Object[])new Object[0]);
        String extraColumn = "col";
        this.db.createTable(SINGLE_TABLE_NAME, "id", "BIGINT NOT NULL GENERATED ALWAYS AS IDENTITY", extraColumn, "FLOAT");
        this.db.insert(SINGLE_TABLE_NAME, extraColumn, Float.valueOf(32.4f));
        this.db.insert(SINGLE_TABLE_NAME, extraColumn, Float.valueOf(33.4f));
        this.db.insert(SINGLE_TABLE_NAME, extraColumn, Float.valueOf(35.4f));
        this.startTask(null, "", null);
        this.verifyPoll(2, "id", Arrays.asList(2L, 3L), false, true, false, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testTimestampRestoreNoVersionOffset() throws Exception {
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(new Timestamp(10L), null);
        this.testTimestampRestoreOffset(Collections.singletonMap(SINGLE_TABLE_PARTITION, offset.toMap()));
    }

    @Test
    public void testTimestampRestoreVersionOneOffset() throws Exception {
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(new Timestamp(10L), null);
        this.testTimestampRestoreOffset(Collections.singletonMap(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap()));
    }

    @Test
    public void testTimestampRestoreOffsetsWithMultipleProtocol() throws Exception {
        TimestampIncrementingOffset oldOffset = new TimestampIncrementingOffset(new Timestamp(8L), null);
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(new Timestamp(10L), null);
        HashMap<Map<String, String>, Map<String, Object>> offsets = new HashMap<Map<String, String>, Map<String, Object>>();
        offsets.put(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap());
        offsets.put(SINGLE_TABLE_PARTITION, oldOffset.toMap());
        this.testTimestampRestoreOffset(offsets);
    }

    private void testTimestampRestoreOffset(Map<Map<String, String>, Map<String, Object>> offsets) throws Exception {
        this.expectInitialize(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION), offsets);
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP NOT NULL", "id", "INT");
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 2);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(11L)), "id", 3);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(12L)), "id", 4);
        this.startTask("modified", null, null);
        this.verifyPoll(2, "id", Arrays.asList(3, 4), true, false, false, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testTimestampAndIncrementingRestoreNoVersionOffset() throws Exception {
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(new Timestamp(10L), Long.valueOf(3L));
        this.testTimestampAndIncrementingRestoreOffset(Collections.singletonMap(SINGLE_TABLE_PARTITION, offset.toMap()));
    }

    @Test
    public void testTimestampAndIncrementingRestoreVersionOneOffset() throws Exception {
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(new Timestamp(10L), Long.valueOf(3L));
        this.testTimestampAndIncrementingRestoreOffset(Collections.singletonMap(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap()));
    }

    @Test
    public void testTimestampAndIncrementingRestoreOffsetsWithMultipleProtocol() throws Exception {
        TimestampIncrementingOffset oldOffset = new TimestampIncrementingOffset(new Timestamp(10L), Long.valueOf(2L));
        TimestampIncrementingOffset offset = new TimestampIncrementingOffset(new Timestamp(10L), Long.valueOf(3L));
        HashMap<Map<String, String>, Map<String, Object>> offsets = new HashMap<Map<String, String>, Map<String, Object>>();
        offsets.put(SINGLE_TABLE_PARTITION_WITH_VERSION, offset.toMap());
        offsets.put(SINGLE_TABLE_PARTITION, oldOffset.toMap());
        this.testTimestampAndIncrementingRestoreOffset(offsets);
    }

    private void testTimestampAndIncrementingRestoreOffset(Map<Map<String, String>, Map<String, Object>> offsets) throws Exception {
        this.expectInitialize(Arrays.asList(SINGLE_TABLE_PARTITION_WITH_VERSION, SINGLE_TABLE_PARTITION), offsets);
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP NOT NULL", "id", "INT NOT NULL");
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(9L)), "id", 2);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 3);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(11L)), "id", 4);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(12L)), "id", 5);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(13L)), "id", 1);
        this.startTask("modified", "id", null);
        this.verifyPoll(3, "id", Arrays.asList(4, 5, 1), true, true, false, "test-" + SINGLE_TABLE_NAME);
        PowerMock.verifyAll();
    }

    @Test
    public void testCustomQueryBulk() throws Exception {
        this.db.createTable(JOIN_TABLE_NAME, "user_id", "INT", "name", "VARCHAR(64)");
        this.db.insert(JOIN_TABLE_NAME, "user_id", 1, "name", "Alice");
        this.db.insert(JOIN_TABLE_NAME, "user_id", 2, "name", "Bob");
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT", "user_id", "INT");
        this.db.insert(SINGLE_TABLE_NAME, "id", 1, "user_id", 1);
        this.startTask(null, null, "SELECT \"test\".\"id\", \"test\".\"user_id\", \"users\".\"name\" FROM \"test\" JOIN \"users\" ON (\"test\".\"user_id\" = \"users\".\"user_id\")");
        List records = this.task.poll();
        Assert.assertEquals((long)1L, (long)records.size());
        HashMap<Integer, Integer> recordUserIdCounts = new HashMap<Integer, Integer>();
        recordUserIdCounts.put(1, 1);
        Assert.assertEquals(recordUserIdCounts, this.countIntValues(records, "id"));
        this.assertRecordsTopic(records, "test-");
        this.assertRecordsSourcePartition(records, QUERY_SOURCE_PARTITION);
        this.db.insert(SINGLE_TABLE_NAME, "id", 2, "user_id", 1);
        this.db.insert(SINGLE_TABLE_NAME, "id", 3, "user_id", 2);
        this.db.insert(SINGLE_TABLE_NAME, "id", 4, "user_id", 2);
        records = this.task.poll();
        Assert.assertEquals((long)4L, (long)records.size());
        recordUserIdCounts = new HashMap();
        recordUserIdCounts.put(1, 2);
        recordUserIdCounts.put(2, 2);
        Assert.assertEquals(recordUserIdCounts, this.countIntValues(records, "user_id"));
        this.assertRecordsTopic(records, "test-");
        this.assertRecordsSourcePartition(records, QUERY_SOURCE_PARTITION);
    }

    @Test
    public void testCustomQueryWithTimestamp() throws Exception {
        this.expectInitializeNoOffsets(Arrays.asList(JOIN_QUERY_PARTITION));
        PowerMock.replayAll((Object[])new Object[0]);
        this.db.createTable(JOIN_TABLE_NAME, "user_id", "INT", "name", "VARCHAR(64)");
        this.db.insert(JOIN_TABLE_NAME, "user_id", 1, "name", "Alice");
        this.db.insert(JOIN_TABLE_NAME, "user_id", 2, "name", "Bob");
        this.db.createTable(SINGLE_TABLE_NAME, "modified", "TIMESTAMP NOT NULL", "id", "INT", "user_id", "INT");
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 1, "user_id", 1);
        this.startTask("modified", null, "SELECT \"test\".\"modified\", \"test\".\"id\", \"test\".\"user_id\", \"users\".\"name\" FROM \"test\" JOIN \"users\" ON (\"test\".\"user_id\" = \"users\".\"user_id\")");
        this.verifyTimestampFirstPoll("test-");
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(10L)), "id", 2, "user_id", 1);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(11L)), "id", 3, "user_id", 2);
        this.db.insert(SINGLE_TABLE_NAME, "modified", DateTimeUtils.formatUtcTimestamp((Date)new Timestamp(12L)), "id", 4, "user_id", 2);
        this.verifyPoll(2, "id", Arrays.asList(3, 4), true, false, false, "test-");
        PowerMock.verifyAll();
    }

    private void startTask(String timestampColumn, String incrementingColumn, String query) {
        this.startTask(timestampColumn, incrementingColumn, query, 0L);
    }

    private void startTask(String timestampColumn, String incrementingColumn, String query, Long delay) {
        String mode = null;
        mode = timestampColumn != null && incrementingColumn != null ? "timestamp+incrementing" : (timestampColumn != null ? "timestamp" : (incrementingColumn != null ? "incrementing" : "bulk"));
        this.initializeTask();
        Map<String, String> taskConfig = this.singleTableConfig();
        taskConfig.put("mode", mode);
        if (query != null) {
            taskConfig.put("query", query);
            taskConfig.put("tables", "");
        }
        if (timestampColumn != null) {
            taskConfig.put("timestamp.column.name", timestampColumn);
        }
        if (incrementingColumn != null) {
            taskConfig.put("incrementing.column.name", incrementingColumn);
        }
        taskConfig.put("timestamp.delay.interval.ms", delay == null ? "0" : delay.toString());
        this.task.start(taskConfig);
    }

    private void verifyIncrementingFirstPoll(String topic) throws Exception {
        List records = this.task.poll();
        Assert.assertEquals(Collections.singletonMap(1, 1), this.countIntValues(records, "id"));
        Assert.assertEquals(Collections.singletonMap(1L, 1), this.countIntIncrementingOffsets(records, "id"));
        this.assertIncrementingOffsets(records);
        this.assertRecordsTopic(records, topic);
    }

    private List<SourceRecord> verifyMultiTimestampFirstPoll(String topic) throws Exception {
        List records = this.task.poll();
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals(Collections.singletonMap(1, 1), this.countIntValues(records, "id"));
        Assert.assertEquals(Collections.singletonMap(10L, 1), this.countTimestampValues(records, "created"));
        this.assertMultiTimestampOffsets(records);
        this.assertRecordsTopic(records, topic);
        return records;
    }

    private List<SourceRecord> verifyTimestampFirstPoll(String topic) throws Exception {
        List records = this.task.poll();
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals(Collections.singletonMap(1, 1), this.countIntValues(records, "id"));
        Assert.assertEquals(Collections.singletonMap(10L, 1), this.countTimestampValues(records, "modified"));
        this.assertTimestampOffsets(records);
        this.assertRecordsTopic(records, topic);
        return records;
    }

    private void verifyIncrementingAndTimestampFirstPoll(String topic) throws Exception {
        List<SourceRecord> records = this.verifyTimestampFirstPoll(topic);
        this.assertIncrementingOffsets(records);
    }

    private void verifyIncrementingAndMultiTimestampFirstPoll(String topic) throws Exception {
        List<SourceRecord> records = this.verifyMultiTimestampFirstPoll(topic);
        this.assertIncrementingOffsets(records);
    }

    private <T> void verifyPoll(int numRecords, String valueField, List<T> values, boolean timestampOffsets, boolean incrementingOffsets, boolean multiTimestampOffsets, String topic) throws Exception {
        List records = this.task.poll();
        Assert.assertEquals((long)numRecords, (long)records.size());
        HashMap<T, Integer> valueCounts = new HashMap<T, Integer>();
        for (T value : values) {
            valueCounts.put(value, 1);
        }
        Assert.assertEquals(valueCounts, this.countIntValues(records, valueField));
        if (timestampOffsets) {
            this.assertTimestampOffsets(records);
        }
        if (incrementingOffsets) {
            this.assertIncrementingOffsets(records);
        }
        if (multiTimestampOffsets) {
            this.assertMultiTimestampOffsets(records);
        }
        this.assertRecordsTopic(records, topic);
    }

    private <T> Map<T, Integer> countInts(List<SourceRecord> records, Field field, String fieldName) {
        HashMap<Object, Integer> result = new HashMap<Object, Integer>();
        for (SourceRecord record : records) {
            Object extracted;
            switch (field) {
                case KEY: {
                    extracted = record.key();
                    break;
                }
                case VALUE: {
                    extracted = ((Struct)record.value()).get(fieldName);
                    break;
                }
                case TIMESTAMP_VALUE: {
                    Date rawTimestamp = (Date)((Struct)record.value()).get(fieldName);
                    extracted = rawTimestamp.getTime();
                    break;
                }
                case INCREMENTING_OFFSET: {
                    TimestampIncrementingOffset offset = TimestampIncrementingOffset.fromMap((Map)record.sourceOffset());
                    extracted = offset.getIncrementingOffset();
                    break;
                }
                case TIMESTAMP_OFFSET: {
                    TimestampIncrementingOffset offset = TimestampIncrementingOffset.fromMap((Map)record.sourceOffset());
                    Timestamp rawTimestamp = offset.getTimestampOffset();
                    extracted = rawTimestamp.getTime();
                    break;
                }
                default: {
                    throw new RuntimeException("Invalid field");
                }
            }
            Integer count = (Integer)result.get(extracted);
            count = (count != null ? count : 0) + 1;
            result.put(extracted, count);
        }
        return result;
    }

    private Map<Integer, Integer> countIntValues(List<SourceRecord> records, String fieldName) {
        return this.countInts(records, Field.VALUE, fieldName);
    }

    private Map<Long, Integer> countTimestampValues(List<SourceRecord> records, String fieldName) {
        return this.countInts(records, Field.TIMESTAMP_VALUE, fieldName);
    }

    private Map<Long, Integer> countIntIncrementingOffsets(List<SourceRecord> records, String fieldName) {
        return this.countInts(records, Field.INCREMENTING_OFFSET, fieldName);
    }

    private void assertIncrementingOffsets(List<SourceRecord> records) {
        for (SourceRecord record : records) {
            Object incrementing = ((Struct)record.value()).get("id");
            long incrementingValue = incrementing instanceof Integer ? (long)((Integer)incrementing).intValue() : (Long)incrementing;
            long offsetValue = TimestampIncrementingOffset.fromMap((Map)record.sourceOffset()).getIncrementingOffset();
            Assert.assertEquals((long)incrementingValue, (long)offsetValue);
        }
    }

    private void assertTimestampOffsets(List<SourceRecord> records) {
        for (SourceRecord record : records) {
            Timestamp timestampValue = (Timestamp)((Struct)record.value()).get("modified");
            Timestamp offsetValue = TimestampIncrementingOffset.fromMap((Map)record.sourceOffset()).getTimestampOffset();
            Assert.assertEquals((Object)timestampValue, (Object)offsetValue);
        }
    }

    private void assertMultiTimestampOffsets(List<SourceRecord> records) {
        for (SourceRecord record : records) {
            Timestamp timestampValue = (Timestamp)((Struct)record.value()).get("modified");
            if (timestampValue == null) {
                timestampValue = (Timestamp)((Struct)record.value()).get("created");
            }
            Timestamp offsetValue = TimestampIncrementingOffset.fromMap((Map)record.sourceOffset()).getTimestampOffset();
            Assert.assertEquals((Object)timestampValue, (Object)offsetValue);
        }
    }

    private void assertRecordsTopic(List<SourceRecord> records, String topic) {
        for (SourceRecord record : records) {
            Assert.assertEquals((Object)topic, (Object)record.topic());
        }
    }

    private void assertRecordsSourcePartition(List<SourceRecord> records, Map<String, String> partition) {
        for (SourceRecord record : records) {
            Assert.assertEquals(partition, (Object)record.sourcePartition());
        }
    }

    private static enum Field {
        KEY,
        VALUE,
        TIMESTAMP_VALUE,
        INCREMENTING_OFFSET,
        TIMESTAMP_OFFSET;

    }
}

