package io.confluent.connect.jms;

import io.confluent.connect.jms.BaseJmsSourceConnectorConfig;
import io.confluent.license.util.StringUtils;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jms/BaseJmsSourceTask.class */
public abstract class BaseJmsSourceTask<C extends BaseJmsSourceConnectorConfig> extends SourceTask {
    static final Logger LOG = LoggerFactory.getLogger(JmsSourceTask.class);
    protected C config;
    protected ConnectionFactory connectionFactory;
    protected Connection connection;
    protected Session session;
    protected RecordConverter converter;
    protected MessageConsumer messageConsumer;
    private int sessionCommit = 0;

    protected abstract ConnectionFactory connectionFactory();

    protected abstract C config(Map<String, String> map);

    public void start(Map<String, String> map) {
        Queue createTopic;
        LOG.trace("start()");
        this.config = config(map);
        this.converter = new RecordConverter(this.config);
        this.connectionFactory = connectionFactory();
        try {
            if (StringUtils.isBlank(this.config.username())) {
                LOG.info("Connecting with no username/password");
                this.connection = this.connectionFactory.createConnection();
            } else {
                LOG.info("Connecting as {}", this.config.username());
                this.connection = this.connectionFactory.createConnection(this.config.username(), this.config.password());
            }
            LOG.info("Starting connection.");
            this.connection.start();
            try {
                this.session = this.connection.createSession(true, 2);
                try {
                    String str = this.config.destinationType;
                    boolean z = -1;
                    switch (str.hashCode()) {
                        case 107944209:
                            if (str.equals("queue")) {
                                z = false;
                                break;
                            }
                            break;
                        case 110546223:
                            if (str.equals("topic")) {
                                z = true;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                            createTopic = this.session.createQueue(this.config.destinationName);
                            break;
                        case true:
                            createTopic = this.session.createTopic(this.config.destinationName);
                            break;
                        default:
                            throw new UnsupportedOperationException("Destination type of '" + this.config.destinationType + "' is not supported.");
                    }
                    try {
                        this.messageConsumer = this.session.createConsumer(createTopic);
                    } catch (JMSException e) {
                        throw new ConnectException("Exception thrown while message consumer.", e);
                    }
                } catch (JMSException e2) {
                    throw new ConnectException("Exception thrown while creating destination.", e2);
                }
            } catch (JMSException e3) {
                throw new ConnectException("Exception thrown while creating connection.", e3);
            }
        } catch (JMSException e4) {
            throw new ConnectException("Exception thrown while creating connection.", e4);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        LOG.trace("poll()");
        try {
            Message receive = this.messageConsumer.receive();
            return receive != null ? Arrays.asList(this.converter.record(receive)) : Collections.emptyList();
        } catch (JMSException e) {
            throw new RetriableException("Exception thrown while receiving message.", e);
        }
    }

    public void commitRecord(SourceRecord sourceRecord) throws InterruptedException {
        LOG.trace("commitRecord()");
        JmsSourceRecord jmsSourceRecord = (JmsSourceRecord) sourceRecord;
        try {
            LOG.trace("Calling message.acknowledge(). JMSMessageID='{}'", jmsSourceRecord.message.getJMSMessageID());
            jmsSourceRecord.message.acknowledge();
            this.sessionCommit++;
        } catch (JMSException e) {
            throw new RetriableException("Exception while calling message.acknowledge()", e);
        }
    }

    public void commit() throws InterruptedException {
        LOG.trace("commit() - sessionCommit = {}", Integer.valueOf(this.sessionCommit));
        try {
            if (this.sessionCommit > 0) {
                LOG.info("Committing Session.");
                this.session.commit();
                this.sessionCommit = 0;
            }
        } catch (JMSException e) {
            throw new RetriableException("Exception thrown during task.commit.", e);
        }
    }

    public void stop() {
        LOG.trace("stop()");
        try {
            this.connection.close();
        } catch (JMSException e) {
            LOG.error("Exception during close() on connection {}", this.connection, e);
        }
    }
}
