package io.confluent.connect.activemq;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.collect.ImmutableMap;
import io.confluent.connect.jms.JmsSourceRecord;
import io.confluent.connect.jms.JsonDestination;
import io.confluent.connect.jms.JsonMessage;
import io.confluent.connect.jms.MockConnectionFactory;
import io.confluent.connect.utils.jackson.ObjectMapperFactory;
import java.util.List;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:io/confluent/connect/activemq/ActiveMQSourceTaskTest.class */
public class ActiveMQSourceTaskTest {
    ActiveMQSourceTask task;

    @BeforeClass
    public static void setupJacksonModules() {
        ObjectMapperFactory.INSTANCE.configure(SerializationFeature.INDENT_OUTPUT, true);
        ObjectMapperFactory.INSTANCE.registerModules(new Module[]{new JsonMessage.Module(), new JsonDestination.Module()});
    }

    @Before
    public void setup() {
        this.task = new ActiveMQSourceTask() { // from class: io.confluent.connect.activemq.ActiveMQSourceTaskTest.1
            protected ConnectionFactory connectionFactory() {
                return new MockConnectionFactory();
            }
        };
        this.task.start(ImmutableMap.of("kafka.topic", "foo", "jms.destination.name", "foo", "activemq.url", "tcp://localhost:61616", "confluent.topic.bootstrap.servers", "localhost:123"));
    }

    @Test
    public void poll() throws InterruptedException, JMSException {
        List<JmsSourceRecord> poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertFalse(poll.isEmpty());
        for (JmsSourceRecord jmsSourceRecord : poll) {
            this.task.commitRecord(jmsSourceRecord);
            ((Message) Mockito.verify(jmsSourceRecord.message, Mockito.atLeastOnce())).acknowledge();
        }
        this.task.commit();
    }

    @Test
    public void commitAfterClose() throws InterruptedException, JMSException {
        List<JmsSourceRecord> poll = this.task.poll();
        Assert.assertNotNull(poll);
        Assert.assertFalse(poll.isEmpty());
        for (JmsSourceRecord jmsSourceRecord : poll) {
            this.task.commitRecord(jmsSourceRecord);
            ((Message) Mockito.verify(jmsSourceRecord.message, Mockito.atLeastOnce())).acknowledge();
        }
        this.task.commit();
        this.task.stop();
        this.task.commit();
    }
}
