package io.confluent.connect.jms.core.source;

import io.confluent.connect.jms.core.source.BaseJmsSourceConnectorConfig;
import io.confluent.connect.jms.core.source.BaseJmsSourceTask;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jms/core/source/BaseJmsSourceTaskIT.class */
public abstract class BaseJmsSourceTaskIT<T extends BaseJmsSourceTask, C extends BaseJmsSourceConnectorConfig> {
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected Map<String, String> settings;
    protected T task;
    protected C config;
    protected Queue<Message> sentMessages;
    protected Queue<Message> failedMessages;
    protected Queue<SourceRecord> polledRecords;
    protected RecordConverter converter;
    private Executor pollExecutor;
    protected Connection connection;
    protected Session session;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Before
    public void setup() throws JMSException {
        this.settings = taskConfiguration();
        this.config = createTaskConfig(this.settings);
        this.sentMessages = new LinkedList();
        this.failedMessages = new LinkedList();
        this.polledRecords = new LinkedList();
        this.pollExecutor = Executors.newSingleThreadExecutor();
        this.converter = new RecordConverter(this.config);
        startTask();
        this.connection = this.task.connectionFactory().createConnection(this.config.username(), this.config.password());
        this.session = this.connection.createSession(true, 2);
    }

    @Test
    public void pollAndReadOneTextMessageWithoutCorrelationId() throws InterruptedException, JMSException {
        clearMessagesInQueue(1L, TimeUnit.SECONDS);
        Message createTextMessage = this.session.createTextMessage("This is a test");
        createTextMessage.setJMSMessageID("10001");
        sendMessage(createTextMessage);
        pollMessages(1, 1000L);
        assertNoMessagesInQueue();
    }

    @Test
    public void pollAndReadOneBytesMessageWithoutCorrelationId() throws InterruptedException, JMSException {
        clearMessagesInQueue(1L, TimeUnit.SECONDS);
        byte[] bytes = "This is a bytes message".getBytes(StandardCharsets.UTF_8);
        Message createBytesMessage = this.session.createBytesMessage();
        createBytesMessage.setJMSMessageID("10001");
        createBytesMessage.writeBytes(bytes);
        createBytesMessage.setJMSType("unspecified");
        sendMessage(createBytesMessage);
        pollMessages(1, 1000L);
        assertNoMessagesInQueue();
    }

    @After
    public void tearDown() throws JMSException {
        this.task.stop();
        try {
            if (this.session != null) {
                this.session.close();
            }
            this.session = null;
            try {
                if (this.connection != null) {
                    this.connection.close();
                }
                this.connection = null;
            } finally {
            }
        } catch (Throwable th) {
            this.session = null;
            try {
                if (this.connection != null) {
                    this.connection.close();
                }
                this.connection = null;
                throw th;
            } finally {
            }
        }
    }

    protected void startTask() {
        this.task = createTask();
        this.task.start(this.settings);
    }

    protected abstract Map<String, String> taskConfiguration();

    protected abstract C createTaskConfig(Map<String, String> map);

    protected abstract T createTask();

    protected void assertNoMessagesInQueue() throws JMSException {
        Assert.assertFalse(hasMessagesInQueue());
    }

    protected boolean hasMessagesInQueue() throws JMSException {
        javax.jms.Queue destination = destination();
        if (destination instanceof javax.jms.Queue) {
            return this.session.createBrowser(destination).getEnumeration().hasMoreElements();
        }
        return false;
    }

    protected void clearMessagesInQueue(long j, TimeUnit timeUnit) throws JMSException {
        Destination destination = destination();
        this.log.trace("Clearing all messages in {}", destination);
        MessageConsumer createConsumer = this.session.createConsumer(destination);
        int i = 0;
        while (createConsumer.receive(timeUnit.toMillis(j)) != null) {
            i++;
            this.log.trace("Found message in {}", destination);
        }
        this.log.trace("Cleared {} messages in {}", Integer.valueOf(i), destination);
    }

    protected Destination destination() throws JMSException {
        String str = this.settings.get("jms.destination.name");
        String str2 = this.settings.get("jms.destination.type");
        if ("queue".equals(str2)) {
            return this.session.createQueue(str);
        }
        if ("topic".equals(str2)) {
            return this.session.createTopic(str);
        }
        Assert.fail("Unexpected type of destination: " + str2);
        return null;
    }

    protected void sendMessage(Message... messageArr) throws JMSException {
        if (messageArr == null || messageArr.length == 0) {
            return;
        }
        MessageProducer createProducer = this.session.createProducer(this.session.createQueue(this.settings.get("jms.destination.name")));
        try {
            for (Message message : messageArr) {
                try {
                    createProducer.send(message);
                    this.session.commit();
                    this.sentMessages.add(message);
                } catch (JMSException e) {
                    this.failedMessages.add(message);
                    throw e;
                } catch (RuntimeException e2) {
                    this.failedMessages.add(message);
                    throw e2;
                }
            }
        } finally {
            if (createProducer != null) {
                createProducer.close();
            }
        }
    }

    protected void checkPolledRecord(SourceRecord sourceRecord) throws JMSException {
        Message poll = this.sentMessages.poll();
        if (poll == null) {
            Assert.fail("Read unexpected record from task: " + sourceRecord);
        } else {
            assertKey(poll, sourceRecord);
            assertValue(poll, sourceRecord);
        }
    }

    protected int pollMessages(final int i, long j) throws InterruptedException {
        if (i < 0) {
            return 0;
        }
        FutureTask futureTask = new FutureTask(new Callable<Integer>() { // from class: io.confluent.connect.jms.core.source.BaseJmsSourceTaskIT.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Integer call() throws Exception {
                int i2 = 0;
                do {
                    List<SourceRecord> poll = BaseJmsSourceTaskIT.this.task.poll();
                    Assert.assertNotNull(poll);
                    BaseJmsSourceTaskIT.this.log.debug("Poll returned {} records in batch", Integer.valueOf(poll.size()));
                    for (SourceRecord sourceRecord : poll) {
                        BaseJmsSourceTaskIT.this.polledRecords.add(sourceRecord);
                        i2++;
                        BaseJmsSourceTaskIT.this.task.commitRecord(sourceRecord);
                        BaseJmsSourceTaskIT.this.log.debug("Committed record {} in batch of {}", Integer.valueOf(i2), Integer.valueOf(poll.size()));
                    }
                    BaseJmsSourceTaskIT.this.task.commit();
                    BaseJmsSourceTaskIT.this.log.debug("Committed task, and proceeding to check {} records", Integer.valueOf(poll.size()));
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        BaseJmsSourceTaskIT.this.checkPolledRecord((SourceRecord) it.next());
                    }
                } while (i2 < i);
                return Integer.valueOf(i2);
            }
        });
        this.pollExecutor.execute(futureTask);
        try {
            return ((Integer) futureTask.get(j, TimeUnit.MILLISECONDS)).intValue();
        } catch (ExecutionException e) {
            if (e.getCause() instanceof RuntimeException) {
                throw ((RuntimeException) e.getCause());
            }
            throw new RuntimeException(e.getCause());
        } catch (TimeoutException e2) {
            futureTask.cancel(true);
            Assert.fail("Timed out waiting for polled records: " + e2.getMessage());
            if ($assertionsDisabled) {
                return -1;
            }
            throw new AssertionError("Should not get here");
        }
    }

    protected void assertKey(Message message, SourceRecord sourceRecord) throws JMSException {
        Assert.assertEquals(RecordConverter.KEY_SCHEMA, sourceRecord.keySchema());
        Struct struct = (Struct) sourceRecord.key();
        Assert.assertEquals(RecordConverter.KEY_SCHEMA, struct.schema());
        Assert.assertNotNull(struct.getString("messageID"));
    }

    protected void assertValue(Message message, SourceRecord sourceRecord) throws JMSException {
        Assert.assertEquals(RecordConverter.VALUE_SCHEMA, sourceRecord.valueSchema());
        Assert.assertEquals(RecordConverter.VALUE_SCHEMA, ((Struct) sourceRecord.value()).schema());
        TextMessage message2 = this.converter.message(this.session, (Struct) sourceRecord.key(), (Struct) sourceRecord.value());
        Assert.assertEquals(message.getJMSDestination(), message2.getJMSDestination());
        Assert.assertEquals(message.getJMSDeliveryMode(), message2.getJMSDeliveryMode());
        Assert.assertEquals(message.getJMSPriority(), message2.getJMSPriority());
        Assert.assertEquals(message.getJMSExpiration(), message2.getJMSExpiration());
        if (message instanceof TextMessage) {
            Assert.assertTrue(message2 instanceof TextMessage);
            Assert.assertEquals(((TextMessage) message).getText(), message2.getText());
            return;
        }
        if (!(message instanceof BytesMessage)) {
            if (message instanceof ObjectMessage) {
                Assert.assertTrue(message2 instanceof ObjectMessage);
                return;
            } else {
                if (message instanceof StreamMessage) {
                    Assert.assertTrue(message2 instanceof StreamMessage);
                    return;
                }
                return;
            }
        }
        BytesMessage bytesMessage = (BytesMessage) message;
        bytesMessage.reset();
        Assert.assertTrue(message2 instanceof BytesMessage);
        BytesMessage bytesMessage2 = (BytesMessage) message2;
        bytesMessage2.reset();
        Assert.assertEquals(bytesMessage.getBodyLength(), bytesMessage2.getBodyLength());
        Assert.assertEquals("Bytes too large", bytesMessage.getBodyLength(), (int) bytesMessage2.getBodyLength());
        Assert.assertEquals(ByteBuffer.wrap(new byte[(int) bytesMessage.getBodyLength()]), ByteBuffer.wrap(new byte[(int) bytesMessage2.getBodyLength()]));
    }

    static {
        $assertionsDisabled = !BaseJmsSourceTaskIT.class.desiredAssertionStatus();
    }
}
