package io.confluent.connect.jms;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.confluent.connect.jms.core.source.JmsClientHelper;
import io.confluent.connect.jms.core.source.JmsSourceRecord;
import io.confluent.connect.jms.core.source.JsonDestination;
import io.confluent.connect.jms.core.source.JsonMessage;
import io.confluent.connect.jms.core.source.MockConnectionInitialContextFactory;
import io.confluent.connect.jms.core.source.MockConnectionInitialContextFactoryPermissive;
import io.confluent.connect.jms.core.source.RecordConverter;
import io.confluent.connect.utils.jackson.ObjectMapperFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jms/JmsSourceTaskTest.class */
public class JmsSourceTaskTest {
    private static final Logger log = LoggerFactory.getLogger(JmsSourceTaskTest.class);
    JmsSourceTask task;
    Map<String, String> settings;

    /* loaded from: input_file:io/confluent/connect/jms/JmsSourceTaskTest$ConnectionResettingJmsSourceTask.class */
    private static class ConnectionResettingJmsSourceTask extends JmsSourceTask {
        private ConnectionResettingJmsSourceTask() {
        }

        protected boolean closeConnectionBeforeRetry(Throwable th) {
            return true;
        }

        protected boolean isRetriable(Throwable th) {
            return true;
        }
    }

    @BeforeClass
    public static void setupJacksonModules() {
        ObjectMapperFactory.INSTANCE.configure(SerializationFeature.INDENT_OUTPUT, true);
        ObjectMapperFactory.INSTANCE.registerModules(new Module[]{new JsonMessage.Module(), new JsonDestination.Module()});
    }

    @Before
    public void beforeEach() {
        this.task = new JmsSourceTask();
        this.settings = new HashMap();
        this.settings.put("kafka.topic", "foo");
        this.settings.put("jms.destination.name", "foo");
        this.settings.put("java.naming.factory.initial", MockConnectionInitialContextFactory.class.getName());
        this.settings.put("java.naming.provider.url", "tcp://localhost");
        this.settings.put("confluent.topic.bootstrap.servers", "localhost:123");
        this.settings.put("max.pending.messages", "5");
        this.settings.put("batch.size", "3");
    }

    @After
    public void afterEach() {
        this.task = null;
        this.settings = null;
    }

    private void init(String str) {
        this.settings.put("name", "Connector-" + str);
        this.settings.put("task.jms.id", "1");
        this.task.start(this.settings);
    }

    protected void configureTask(int i, int i2) {
        this.settings.put("max.pending.messages", Integer.toString(i));
        this.settings.put("batch.size", Integer.toString(i2));
    }

    protected void configureTaskWithRandomBatchSizes() {
        this.settings.put("batch.size.seed", "120230");
    }

    protected void configureTaskWithConsumedBatchSizes(int... iArr) {
        this.settings.put("batch.sizes", (String) IntStream.of(iArr).mapToObj(Integer::toString).collect(Collectors.joining(",")));
    }

    @Test
    public void jmsExceptionOnPollThrows() throws Exception {
        init("jmsExceptionOnPollThrows");
        JmsClientHelper jmsClientHelper = (JmsClientHelper) Mockito.mock(JmsClientHelper.class);
        this.task.clientHelper = jmsClientHelper;
        Mockito.when(jmsClientHelper.receive(ArgumentMatchers.anyLong())).thenThrow(JMSException.class);
        try {
            this.task.poll();
            Assert.fail();
        } catch (ConnectException e) {
            Assert.assertEquals(ConnectException.class, e.getCause().getClass());
            Assert.assertFalse(this.task.clientHelper.isClosed());
        }
    }

    @Test
    public void jmsExceptionRetryAndCloseConnection() throws Exception {
        this.task = new ConnectionResettingJmsSourceTask();
        init("jmsExceptionRetryAndCloseConnection");
        JmsClientHelper jmsClientHelper = (JmsClientHelper) Mockito.mock(JmsClientHelper.class);
        this.task.clientHelper = jmsClientHelper;
        this.task.converter = (RecordConverter) Mockito.mock(RecordConverter.class);
        Mockito.when(this.task.converter.record((Message) ArgumentMatchers.any())).thenReturn((Object) null);
        Mockito.when(jmsClientHelper.receive(ArgumentMatchers.anyLong())).thenThrow(new Throwable[]{new JMSException("exception")}).thenThrow(new Throwable[]{new JMSException("exception!")}).thenReturn(Mockito.mock(Message.class));
        this.task.poll();
        ((JmsClientHelper) Mockito.verify(jmsClientHelper, Mockito.times(2))).close();
        ((JmsClientHelper) Mockito.verify(jmsClientHelper, Mockito.times(3))).receive(ArgumentMatchers.anyLong());
    }

    @Test(expected = ConnectException.class)
    public void jmsExceptionNoRetry() throws InterruptedException, JMSException {
        this.task = new ConnectionResettingJmsSourceTask();
        this.settings.put("max.retry.time", "0");
        init("jmsExceptionNoRetry");
        JmsClientHelper jmsClientHelper = (JmsClientHelper) Mockito.mock(JmsClientHelper.class);
        this.task.clientHelper = jmsClientHelper;
        this.task.converter = (RecordConverter) Mockito.mock(RecordConverter.class);
        Mockito.when(this.task.converter.record((Message) ArgumentMatchers.any())).thenReturn((Object) null);
        Mockito.when(jmsClientHelper.receive(ArgumentMatchers.anyLong())).thenThrow(new Throwable[]{new JMSException("exception")});
        this.task.poll();
    }

    @Test(expected = ConnectException.class)
    public void throwCommitException() throws InterruptedException {
        init("throwCommitException");
        this.task.commitException.set(new JMSException("sample exception!"));
        this.task.poll();
    }

    @Test
    public void poll() throws InterruptedException, JMSException {
        configureTask(5, 3);
        init("poll");
        List poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertEquals(3L, poll.size());
        JmsSourceRecord jmsSourceRecord = (JmsSourceRecord) poll.get(0);
        this.task.commitRecord(jmsSourceRecord);
        ((Message) Mockito.verify(jmsSourceRecord.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord2 = (JmsSourceRecord) poll.get(1);
        this.task.commitRecord(jmsSourceRecord2);
        ((Message) Mockito.verify(jmsSourceRecord2.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord3 = (JmsSourceRecord) poll.get(2);
        this.task.commitRecord(jmsSourceRecord3);
        ((Message) Mockito.verify(jmsSourceRecord3.message, Mockito.never())).acknowledge();
        this.task.commit();
        List poll2 = this.task.poll();
        Assert.assertNotNull(poll2);
        Assert.assertEquals(2L, poll2.size());
        JmsSourceRecord jmsSourceRecord4 = (JmsSourceRecord) poll2.get(0);
        this.task.commitRecord(jmsSourceRecord4);
        ((Message) Mockito.verify(jmsSourceRecord4.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord5 = (JmsSourceRecord) poll2.get(1);
        this.task.commitRecord(jmsSourceRecord5);
        ((Message) Mockito.verify(jmsSourceRecord5.message, Mockito.atLeastOnce())).acknowledge();
        this.task.commit();
        List poll3 = this.task.poll();
        Assert.assertNotNull(poll3);
        Assert.assertEquals(3L, poll3.size());
        JmsSourceRecord jmsSourceRecord6 = (JmsSourceRecord) poll3.get(0);
        this.task.commitRecord(jmsSourceRecord6);
        ((Message) Mockito.verify(jmsSourceRecord6.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord7 = (JmsSourceRecord) poll3.get(1);
        this.task.commitRecord(jmsSourceRecord7);
        ((Message) Mockito.verify(jmsSourceRecord7.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord8 = (JmsSourceRecord) poll3.get(2);
        this.task.commitRecord(jmsSourceRecord8);
        ((Message) Mockito.verify(jmsSourceRecord8.message, Mockito.never())).acknowledge();
        this.task.commit();
        List poll4 = this.task.poll();
        Assert.assertNotNull(poll4);
        Assert.assertEquals(2L, poll4.size());
        JmsSourceRecord jmsSourceRecord9 = (JmsSourceRecord) poll4.get(0);
        this.task.commitRecord(jmsSourceRecord9);
        ((Message) Mockito.verify(jmsSourceRecord9.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord10 = (JmsSourceRecord) poll4.get(1);
        this.task.commitRecord(jmsSourceRecord10);
        ((Message) Mockito.verify(jmsSourceRecord10.message, Mockito.atLeastOnce())).acknowledge();
        this.task.commit();
        this.task.stop();
    }

    @Test
    public void commitAfterClose() throws InterruptedException, JMSException {
        configureTask(5, 3);
        init("commitAfterClose");
        List poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertFalse(poll.isEmpty());
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            this.task.commitRecord((SourceRecord) it.next());
        }
        this.task.commit();
        this.task.stop();
        this.task.commit();
    }

    @Test
    public void testNoRecvTillCommit() throws InterruptedException {
        configureTask(5, 3);
        init("testNoRecvTillCommit");
        ArrayList arrayList = new ArrayList();
        List poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertEquals(3L, poll.size());
        arrayList.addAll(poll);
        List poll2 = this.task.poll();
        Assert.assertNotNull(poll2);
        Assert.assertEquals(2L, poll2.size());
        arrayList.addAll(poll2);
        Assert.assertNull(this.task.poll());
        Assert.assertNull(this.task.poll());
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            this.task.commitRecord((SourceRecord) it.next());
        }
        this.task.commit();
        Assert.assertNotNull(this.task.poll());
        Assert.assertEquals(3L, r0.size());
        this.task.stop();
    }

    @Test
    public void avoidPollAndCommitWithIntermittentReceives() throws InterruptedException {
        configureTask(4096, 1024);
        configureTaskWithConsumedBatchSizes(658, 0, 0, 0, 0, 0, 0, 3528, 3960, 6144, 20480);
        init("avoidPollAndCommitWithIntermittentReceives");
        List poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertEquals(658L, poll.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll.size()));
        JmsSourceTask jmsSourceTask = this.task;
        jmsSourceTask.getClass();
        poll.forEach(jmsSourceTask::commitRecord);
        for (int i = 0; i < 6; i++) {
            Assert.assertNull(this.task.poll());
        }
        for (int i2 = 0; i2 != 3; i2++) {
            List poll2 = this.task.poll();
            Assert.assertNotNull(poll2);
            Assert.assertEquals(1024L, poll2.size());
            log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll2.size()));
            JmsSourceTask jmsSourceTask2 = this.task;
            jmsSourceTask2.getClass();
            poll2.forEach(jmsSourceTask2::commitRecord);
        }
        ArrayList arrayList = new ArrayList();
        List poll3 = this.task.poll();
        Assert.assertNotNull(poll3);
        Assert.assertEquals(456L, poll3.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll3.size()));
        JmsSourceTask jmsSourceTask3 = this.task;
        jmsSourceTask3.getClass();
        poll3.forEach(jmsSourceTask3::commitRecord);
        assertNoPending();
        List poll4 = this.task.poll();
        Assert.assertNotNull(poll4);
        Assert.assertEquals(1024L, poll4.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll4.size()));
        JmsSourceTask jmsSourceTask4 = this.task;
        jmsSourceTask4.getClass();
        poll4.forEach(jmsSourceTask4::commitRecord);
        assertPending(1024, 0, false);
        List poll5 = this.task.poll();
        Assert.assertNotNull(poll5);
        Assert.assertEquals(1024L, poll5.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll5.size()));
        JmsSourceTask jmsSourceTask5 = this.task;
        jmsSourceTask5.getClass();
        poll5.forEach(jmsSourceTask5::commitRecord);
        assertPending(2048, 0, false);
        List poll6 = this.task.poll();
        Assert.assertNotNull(poll6);
        Assert.assertEquals(1024L, poll6.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll6.size()));
        JmsSourceTask jmsSourceTask6 = this.task;
        jmsSourceTask6.getClass();
        poll6.forEach(jmsSourceTask6::commitRecord);
        assertPending(3072, 0, false);
        List poll7 = this.task.poll();
        Assert.assertNotNull(poll7);
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll7.size()));
        Assert.assertEquals(888L, poll7.size());
        arrayList.addAll(poll7);
        JmsSourceTask jmsSourceTask7 = this.task;
        jmsSourceTask7.getClass();
        poll7.forEach(jmsSourceTask7::commitRecord);
        assertNoPending();
        List poll8 = this.task.poll();
        Assert.assertNotNull(poll8);
        Assert.assertEquals(1024L, poll8.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll8.size()));
        JmsSourceTask jmsSourceTask8 = this.task;
        jmsSourceTask8.getClass();
        poll8.forEach(jmsSourceTask8::commitRecord);
        assertPending(1024, 0, false);
        List poll9 = this.task.poll();
        Assert.assertNotNull(poll9);
        Assert.assertEquals(1024L, poll9.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll9.size()));
        JmsSourceTask jmsSourceTask9 = this.task;
        jmsSourceTask9.getClass();
        poll9.forEach(jmsSourceTask9::commitRecord);
        assertPending(2048, 0, false);
        List poll10 = this.task.poll();
        Assert.assertNotNull(poll10);
        Assert.assertEquals(1024L, poll10.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll10.size()));
        JmsSourceTask jmsSourceTask10 = this.task;
        jmsSourceTask10.getClass();
        poll10.forEach(jmsSourceTask10::commitRecord);
        assertPending(3072, 0, false);
        List poll11 = this.task.poll();
        Assert.assertNotNull(poll11);
        Assert.assertEquals(1024L, poll11.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll11.size()));
        assertPending(4096, 1024, true);
        JmsSourceTask jmsSourceTask11 = this.task;
        jmsSourceTask11.getClass();
        poll11.forEach(jmsSourceTask11::commitRecord);
        assertNoPending();
        List poll12 = this.task.poll();
        Assert.assertNotNull(poll12);
        Assert.assertEquals(1024L, poll12.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll12.size()));
        assertPending(1024, 1024, false);
        JmsSourceTask jmsSourceTask12 = this.task;
        jmsSourceTask12.getClass();
        poll12.forEach(jmsSourceTask12::commitRecord);
        assertPending(1024, 0, false);
        List poll13 = this.task.poll();
        Assert.assertNotNull(poll13);
        Assert.assertEquals(1024L, poll13.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll13.size()));
        assertPending(2048, 1024, false);
        JmsSourceTask jmsSourceTask13 = this.task;
        jmsSourceTask13.getClass();
        poll13.forEach(jmsSourceTask13::commitRecord);
        assertPending(2048, 0, false);
        Assert.assertNull(this.task.poll());
        assertPending(2048, 0, true);
        List poll14 = this.task.poll();
        Assert.assertNotNull(poll14);
        Assert.assertEquals(1024L, poll14.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll14.size()));
        assertPending(3072, 1024, true);
        JmsSourceTask jmsSourceTask14 = this.task;
        jmsSourceTask14.getClass();
        poll14.forEach(jmsSourceTask14::commitRecord);
        assertNoPending();
        List poll15 = this.task.poll();
        Assert.assertNotNull(poll15);
        Assert.assertEquals(1024L, poll15.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll15.size()));
        assertPending(1024, 1024, false);
        JmsSourceTask jmsSourceTask15 = this.task;
        jmsSourceTask15.getClass();
        poll15.forEach(jmsSourceTask15::commitRecord);
        assertPending(1024, 0, false);
        List poll16 = this.task.poll();
        Assert.assertNotNull(poll16);
        Assert.assertEquals(1024L, poll16.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll16.size()));
        assertPending(2048, 1024, false);
        JmsSourceTask jmsSourceTask16 = this.task;
        jmsSourceTask16.getClass();
        poll16.forEach(jmsSourceTask16::commitRecord);
        assertPending(2048, 0, false);
        List poll17 = this.task.poll();
        Assert.assertNotNull(poll17);
        Assert.assertEquals(1024L, poll17.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll17.size()));
        assertPending(3072, 1024, false);
        JmsSourceTask jmsSourceTask17 = this.task;
        jmsSourceTask17.getClass();
        poll17.forEach(jmsSourceTask17::commitRecord);
        assertPending(3072, 0, false);
        List poll18 = this.task.poll();
        Assert.assertNotNull(poll18);
        Assert.assertEquals(1024L, poll18.size());
        log.debug("{} About to send {} records to Kafka", "WorkerSourceTask{id=PROD_MECH_TXN_EDF-0}", Integer.valueOf(poll18.size()));
        assertPending(4096, 1024, true);
        JmsSourceTask jmsSourceTask18 = this.task;
        jmsSourceTask18.getClass();
        poll18.forEach(jmsSourceTask18::commitRecord);
        assertNoPending();
        this.task.stop();
    }

    @Test
    public void pollWithPermissiveSchema() throws InterruptedException, JMSException {
        this.settings.put("java.naming.factory.initial", MockConnectionInitialContextFactoryPermissive.class.getName());
        this.settings.put("use.permissive.schema", "true");
        init("commitAfterCloseWithPermissiveSchema");
        List poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertEquals(1L, poll.size());
        JmsSourceRecord jmsSourceRecord = (JmsSourceRecord) poll.get(0);
        this.task.commitRecord(jmsSourceRecord);
        ((Message) Mockito.verify(jmsSourceRecord.message, Mockito.atLeastOnce())).acknowledge();
        this.task.commit();
        this.task.stop();
    }

    @Test
    public void commitAfterCloseWithPermissiveSchema() throws InterruptedException, JMSException {
        this.settings.put("java.naming.factory.initial", MockConnectionInitialContextFactoryPermissive.class.getName());
        this.settings.put("use.permissive.schema", "true");
        init("commitAfterCloseWithPermissiveSchema");
        List poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertFalse(poll.isEmpty());
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            this.task.commitRecord((SourceRecord) it.next());
        }
        this.task.commit();
        this.task.stop();
        this.task.commit();
    }

    public String toString() {
        return getClass().getSimpleName();
    }

    private void assertNoPending() {
        Assert.assertTrue(this.task.getPendingMessages().isEmpty());
        Assert.assertFalse(this.task.isWaitingForPendingToCommit());
        Assert.assertEquals(0L, this.task.numProduced());
    }

    private void assertPending(int i, int i2, boolean z) {
        Assert.assertEquals(i2, this.task.getPendingMessages().size());
        Assert.assertEquals(i, this.task.numProduced());
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(this.task.isWaitingForPendingToCommit()));
    }
}
