package io.confluent.connect.activemq;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.confluent.connect.jms.core.source.JmsSourceRecord;
import io.confluent.connect.jms.core.source.JsonDestination;
import io.confluent.connect.jms.core.source.JsonMessage;
import io.confluent.connect.jms.core.source.MockConnectionFactory;
import io.confluent.connect.utils.jackson.ObjectMapperFactory;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/connect/activemq/ActiveMQSourceTaskTest.class */
public class ActiveMQSourceTaskTest {
    ActiveMQSourceTask task;

    @BeforeClass
    public static void setupJacksonModules() {
        ObjectMapperFactory.INSTANCE.configure(SerializationFeature.INDENT_OUTPUT, true);
        ObjectMapperFactory.INSTANCE.registerModules(new Module[]{new JsonMessage.Module(), new JsonDestination.Module()});
    }

    @Before
    public void setup() {
        this.task = new ActiveMQSourceTask() { // from class: io.confluent.connect.activemq.ActiveMQSourceTaskTest.1
            protected ConnectionFactory connectionFactory() {
                return new MockConnectionFactory();
            }
        };
        HashMap hashMap = new HashMap();
        hashMap.put("kafka.topic", "foo");
        hashMap.put("jms.destination.name", "foo");
        hashMap.put("activemq.url", "tcp://localhost:61616");
        hashMap.put("confluent.topic.bootstrap.servers", "localhost:123");
        hashMap.put("max.pending.messages", "5");
        hashMap.put("batch.size", "3");
        this.task.start(hashMap);
    }

    @Test
    public void poll() throws InterruptedException, JMSException {
        List poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertEquals(3L, poll.size());
        JmsSourceRecord jmsSourceRecord = (JmsSourceRecord) poll.get(0);
        this.task.commitRecord(jmsSourceRecord);
        ((Message) Mockito.verify(jmsSourceRecord.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord2 = (JmsSourceRecord) poll.get(1);
        this.task.commitRecord(jmsSourceRecord2);
        ((Message) Mockito.verify(jmsSourceRecord2.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord3 = (JmsSourceRecord) poll.get(2);
        this.task.commitRecord(jmsSourceRecord3);
        ((Message) Mockito.verify(jmsSourceRecord3.message, Mockito.never())).acknowledge();
        this.task.commit();
        List poll2 = this.task.poll();
        Assert.assertNotNull(poll2);
        Assert.assertEquals(2L, poll2.size());
        JmsSourceRecord jmsSourceRecord4 = (JmsSourceRecord) poll2.get(0);
        this.task.commitRecord(jmsSourceRecord4);
        ((Message) Mockito.verify(jmsSourceRecord4.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord5 = (JmsSourceRecord) poll2.get(1);
        this.task.commitRecord(jmsSourceRecord5);
        ((Message) Mockito.verify(jmsSourceRecord5.message, Mockito.atLeastOnce())).acknowledge();
        this.task.commit();
        List poll3 = this.task.poll();
        Assert.assertNotNull(poll3);
        Assert.assertEquals(3L, poll3.size());
        JmsSourceRecord jmsSourceRecord6 = (JmsSourceRecord) poll3.get(0);
        this.task.commitRecord(jmsSourceRecord6);
        ((Message) Mockito.verify(jmsSourceRecord6.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord7 = (JmsSourceRecord) poll3.get(1);
        this.task.commitRecord(jmsSourceRecord7);
        ((Message) Mockito.verify(jmsSourceRecord7.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord8 = (JmsSourceRecord) poll3.get(2);
        this.task.commitRecord(jmsSourceRecord8);
        ((Message) Mockito.verify(jmsSourceRecord8.message, Mockito.never())).acknowledge();
        this.task.commit();
        List poll4 = this.task.poll();
        Assert.assertNotNull(poll4);
        Assert.assertEquals(2L, poll4.size());
        JmsSourceRecord jmsSourceRecord9 = (JmsSourceRecord) poll4.get(0);
        this.task.commitRecord(jmsSourceRecord9);
        ((Message) Mockito.verify(jmsSourceRecord9.message, Mockito.never())).acknowledge();
        JmsSourceRecord jmsSourceRecord10 = (JmsSourceRecord) poll4.get(1);
        this.task.commitRecord(jmsSourceRecord10);
        ((Message) Mockito.verify(jmsSourceRecord10.message, Mockito.atLeastOnce())).acknowledge();
        this.task.commit();
        this.task.stop();
    }

    @Test
    public void commitAfterClose() throws InterruptedException, JMSException {
        List poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertFalse(poll.isEmpty());
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            this.task.commitRecord((SourceRecord) it.next());
        }
        this.task.commit();
        this.task.stop();
        this.task.commit();
    }
}
