package io.confluent.connect.jms;

import io.confluent.connect.jms.BaseJmsSourceConnectorConfig;
import io.confluent.license.util.StringUtils;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jms/BaseJmsSourceTask.class */
public abstract class BaseJmsSourceTask<C extends BaseJmsSourceConnectorConfig> extends SourceTask {
    static final Logger LOG = LoggerFactory.getLogger(BaseJmsSourceTask.class);
    private static final boolean TRANSACTION_MODE = false;
    private static final int ACKNOWLEDGE_MODE = 2;
    private static final long JMS_RECV_DURATION_MS = 5000;
    private static final long WAIT_FOR_COMMIT_DURATION_MS = 500;
    private static final long TRY_LOCK_DURATION_MS = 10;
    protected C config;
    protected ConnectionFactory connectionFactory;
    protected Connection connection;
    protected Session session;
    protected RecordConverter converter;
    protected MessageConsumer messageConsumer;
    protected final ReentrantLock lock = new ReentrantLock();
    protected final Condition produced = this.lock.newCondition();
    protected boolean messagePending;

    protected abstract ConnectionFactory connectionFactory();

    protected abstract C config(Map<String, String> map);

    public void start(Map<String, String> map) {
        Queue createTopic;
        LOG.trace("start()");
        this.config = config(map);
        this.converter = new RecordConverter(this.config);
        this.connectionFactory = connectionFactory();
        try {
            if (StringUtils.isBlank(this.config.username())) {
                LOG.info("Connecting with no username/password");
                this.connection = this.connectionFactory.createConnection();
            } else {
                LOG.info("Connecting as {}", this.config.username());
                this.connection = this.connectionFactory.createConnection(this.config.username(), this.config.password());
            }
            LOG.info("Starting connection.");
            this.connection.start();
            try {
                this.session = this.connection.createSession(false, ACKNOWLEDGE_MODE);
                try {
                    String str = this.config.destinationType;
                    boolean z = -1;
                    switch (str.hashCode()) {
                        case 107944209:
                            if (str.equals("queue")) {
                                z = TRANSACTION_MODE;
                                break;
                            }
                            break;
                        case 110546223:
                            if (str.equals("topic")) {
                                z = true;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case TRANSACTION_MODE /* 0 */:
                            createTopic = this.session.createQueue(this.config.destinationName);
                            break;
                        case true:
                            createTopic = this.session.createTopic(this.config.destinationName);
                            break;
                        default:
                            throw new UnsupportedOperationException("Destination type of '" + this.config.destinationType + "' is not supported.");
                    }
                    try {
                        this.messageConsumer = this.session.createConsumer(createTopic);
                    } catch (JMSException e) {
                        throw new ConnectException("Exception thrown while message consumer.", e);
                    }
                } catch (JMSException e2) {
                    throw new ConnectException("Exception thrown while creating destination.", e2);
                }
            } catch (JMSException e3) {
                throw new ConnectException("Exception thrown while creating connection.", e3);
            }
        } catch (JMSException e4) {
            throw new ConnectException("Exception thrown while creating connection.", e4);
        }
    }

    protected List<SourceRecord> receive() {
        LOG.trace("receive()");
        try {
            Message receive = this.messageConsumer.receive(JMS_RECV_DURATION_MS);
            if (receive == null) {
                LOG.trace("No message received.");
                return null;
            }
            LOG.trace("Received message with id='{}'", receive.getJMSMessageID());
            this.messagePending = true;
            return Collections.singletonList(this.converter.record(receive));
        } catch (JMSException e) {
            throw new RetriableException("Exception thrown while receiving message.", e);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        LOG.trace("poll()");
        if (!this.lock.tryLock(TRY_LOCK_DURATION_MS, TimeUnit.MILLISECONDS)) {
            LOG.trace("Could not get lock within {} ms, so skipping this poll", Long.valueOf(TRY_LOCK_DURATION_MS));
            return null;
        }
        try {
            if (this.messagePending) {
                LOG.trace("Waiting up to {} ms for messages to be committed", Long.valueOf(WAIT_FOR_COMMIT_DURATION_MS));
                this.produced.await(WAIT_FOR_COMMIT_DURATION_MS, TimeUnit.MILLISECONDS);
            }
            if (!this.messagePending) {
                return receive();
            }
            LOG.trace("Messages not yet committed. Skipping receive().", Long.valueOf(WAIT_FOR_COMMIT_DURATION_MS));
            return null;
        } finally {
            this.lock.unlock();
        }
    }

    public void commitRecord(SourceRecord sourceRecord) {
        LOG.trace("commitRecord()");
        this.lock.lock();
        try {
            try {
                Message message = ((JmsSourceRecord) sourceRecord).message;
                message.acknowledge();
                LOG.trace("Acknowledged message with id='{}'", message.getJMSMessageID());
                this.messagePending = false;
                this.produced.signalAll();
                this.lock.unlock();
            } catch (JMSException e) {
                throw new RetriableException("Exception while calling message.acknowledge()", e);
            }
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public void stop() {
        LOG.trace("stop()");
        try {
            if (this.connection != null) {
                LOG.info("Closing JMS connection");
                this.connection.close();
            } else {
                LOG.info("JMS connection is null, skipping closing");
            }
        } catch (JMSException e) {
            LOG.error("Exception during close() on connection {}", this.connection, e);
        }
    }
}
