package io.confluent.connect.jdbc.source;

import io.confluent.connect.jdbc.util.CachedConnectionProvider;
import java.sql.Connection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.modules.junit4.PowerMockRunner;

@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*"})
/* loaded from: input_file:io/confluent/connect/jdbc/source/JdbcSourceTaskLifecycleTest.class */
public class JdbcSourceTaskLifecycleTest extends JdbcSourceTaskTestBase {

    @Mock
    private CachedConnectionProvider mockCachedConnectionProvider;

    @Mock
    private Connection conn;

    @Test(expected = ConnectException.class)
    public void testMissingParentConfig() {
        Map<String, String> singleTableConfig = singleTableConfig();
        singleTableConfig.remove("connection.url");
        this.task.start(singleTableConfig);
    }

    @Test(expected = ConnectException.class)
    public void testMissingTables() {
        Map<String, String> singleTableConfig = singleTableConfig();
        singleTableConfig.remove("tables");
        this.task.start(singleTableConfig);
    }

    @Test
    public void testStartStopDifferentThreads() throws Exception {
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT");
        this.task = new JdbcSourceTask(this.time) { // from class: io.confluent.connect.jdbc.source.JdbcSourceTaskLifecycleTest.1
            protected CachedConnectionProvider connectionProvider(int i, long j) {
                return JdbcSourceTaskLifecycleTest.this.mockCachedConnectionProvider;
            }
        };
        EasyMock.expect(this.mockCachedConnectionProvider.getConnection()).andReturn(this.db.getConnection()).anyTimes();
        this.mockCachedConnectionProvider.close();
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        Object obj = new Object();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        newSingleThreadExecutor.submit(() -> {
            this.task.start(singleTableConfig());
            while (atomicBoolean.get()) {
                this.task.poll();
                synchronized (obj) {
                    obj.notifyAll();
                }
            }
            return null;
        });
        synchronized (obj) {
            obj.wait();
        }
        try {
            this.task.stop();
            synchronized (obj) {
                obj.wait();
            }
            atomicBoolean.set(false);
            newSingleThreadExecutor.shutdown();
            PowerMock.verifyAll();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            throw th;
        }
    }

    @Test
    public void testStartStopSameThread() {
        this.task = new JdbcSourceTask(this.time) { // from class: io.confluent.connect.jdbc.source.JdbcSourceTaskLifecycleTest.2
            protected CachedConnectionProvider connectionProvider(int i, long j) {
                return JdbcSourceTaskLifecycleTest.this.mockCachedConnectionProvider;
            }
        };
        EasyMock.expect(this.mockCachedConnectionProvider.getConnection()).andReturn(this.db.getConnection());
        this.mockCachedConnectionProvider.close();
        PowerMock.expectLastCall();
        PowerMock.replayAll(new Object[0]);
        this.task.start(singleTableConfig());
        this.task.stop();
        PowerMock.verifyAll();
    }

    @Test
    public void testPollInterval() throws Exception {
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT");
        this.db.insert(SINGLE_TABLE_NAME, "id", 1);
        long milliseconds = this.time.milliseconds();
        this.task.start(singleTableConfig());
        this.task.poll();
        Assert.assertEquals(milliseconds, this.time.milliseconds());
        this.task.poll();
        Assert.assertEquals(milliseconds + 5000, this.time.milliseconds());
        this.task.poll();
        Assert.assertEquals(milliseconds + 10000, this.time.milliseconds());
        this.task.stop();
    }

    @Test
    public void testSingleUpdateMultiplePoll() throws Exception {
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT");
        Map<String, String> singleTableConfig = singleTableConfig();
        singleTableConfig.put("batch.max.rows", "1");
        long milliseconds = this.time.milliseconds();
        this.task.start(singleTableConfig);
        this.db.insert(SINGLE_TABLE_NAME, "id", 1);
        this.db.insert(SINGLE_TABLE_NAME, "id", 2);
        List poll = this.task.poll();
        Assert.assertEquals(milliseconds, this.time.milliseconds());
        Assert.assertEquals(1L, poll.size());
        List poll2 = this.task.poll();
        Assert.assertEquals(milliseconds, this.time.milliseconds());
        Assert.assertEquals(1L, poll2.size());
        this.task.poll();
        Assert.assertEquals(milliseconds + 5000, this.time.milliseconds());
    }

    @Test
    public void testMultipleTables() throws Exception {
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT");
        this.db.createTable(SECOND_TABLE_NAME, "id", "INT");
        long milliseconds = this.time.milliseconds();
        this.task.start(twoTableConfig());
        this.db.insert(SINGLE_TABLE_NAME, "id", 1);
        this.db.insert(SECOND_TABLE_NAME, "id", 2);
        List poll = this.task.poll();
        Assert.assertEquals(milliseconds, this.time.milliseconds());
        Assert.assertEquals(1L, poll.size());
        Assert.assertEquals(SINGLE_TABLE_PARTITION, ((SourceRecord) poll.get(0)).sourcePartition());
        List poll2 = this.task.poll();
        Assert.assertEquals(milliseconds, this.time.milliseconds());
        Assert.assertEquals(1L, poll2.size());
        Assert.assertEquals(SECOND_TABLE_PARTITION, ((SourceRecord) poll2.get(0)).sourcePartition());
        List poll3 = this.task.poll();
        Assert.assertEquals(milliseconds + 5000, this.time.milliseconds());
        validatePollResultTable(poll3, 1, SINGLE_TABLE_NAME);
        List poll4 = this.task.poll();
        Assert.assertEquals(milliseconds + 5000, this.time.milliseconds());
        validatePollResultTable(poll4, 1, SECOND_TABLE_NAME);
    }

    @Test
    public void testMultipleTablesMultiplePolls() throws Exception {
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT");
        this.db.createTable(SECOND_TABLE_NAME, "id", "INT");
        Map<String, String> twoTableConfig = twoTableConfig();
        twoTableConfig.put("batch.max.rows", "1");
        long milliseconds = this.time.milliseconds();
        this.task.start(twoTableConfig);
        this.db.insert(SINGLE_TABLE_NAME, "id", 1);
        this.db.insert(SINGLE_TABLE_NAME, "id", 2);
        this.db.insert(SECOND_TABLE_NAME, "id", 3);
        this.db.insert(SECOND_TABLE_NAME, "id", 4);
        for (int i = 0; i < 2; i++) {
            List poll = this.task.poll();
            Assert.assertEquals(milliseconds, this.time.milliseconds());
            validatePollResultTable(poll, 1, SINGLE_TABLE_NAME);
        }
        for (int i2 = 0; i2 < 2; i2++) {
            List poll2 = this.task.poll();
            Assert.assertEquals(milliseconds, this.time.milliseconds());
            validatePollResultTable(poll2, 1, SECOND_TABLE_NAME);
        }
        for (int i3 = 0; i3 < 2; i3++) {
            List poll3 = this.task.poll();
            Assert.assertEquals(milliseconds + 5000, this.time.milliseconds());
            validatePollResultTable(poll3, 1, SINGLE_TABLE_NAME);
        }
        for (int i4 = 0; i4 < 2; i4++) {
            List poll4 = this.task.poll();
            Assert.assertEquals(milliseconds + 5000, this.time.milliseconds());
            validatePollResultTable(poll4, 1, SECOND_TABLE_NAME);
        }
    }

    @Test
    public void testMultipleTablesNothingToDoReturns() throws Exception {
        this.db.createTable(SINGLE_TABLE_NAME, "id", "INT");
        this.db.createTable(SECOND_TABLE_NAME, "id", "INT");
        this.task.start(twoTableConfig());
        Assert.assertNull(this.task.poll());
    }

    private static void validatePollResultTable(List<SourceRecord> list, int i, String str) {
        Assert.assertEquals(i, list.size());
        Iterator<SourceRecord> it = list.iterator();
        while (it.hasNext()) {
            Assert.assertEquals(str, it.next().sourcePartition().get("table"));
        }
    }
}
