package io.confluent.connect.jms;

import io.confluent.connect.jms.BaseJmsSourceConnectorConfig;
import io.confluent.license.util.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jms/BaseJmsSourceTask.class */
public abstract class BaseJmsSourceTask<C extends BaseJmsSourceConnectorConfig> extends SourceTask {
    static final Logger LOG = LoggerFactory.getLogger(BaseJmsSourceTask.class);
    private static final boolean TRANSACTION_MODE = false;
    private static final int ACKNOWLEDGE_MODE = 2;
    private static final long JMS_RECV_LONG_DURATION_MS = 5000;
    private static final long JMS_RECV_SHORT_DURATION_MS = 10;
    protected C config;
    protected ConnectionFactory connectionFactory;
    protected Connection connection;
    protected Session session;
    protected RecordConverter converter;
    protected MessageConsumer messageConsumer;
    private int batchSize;
    private int maxPending;
    private long maxPollDuration;
    private final Lock pendingCommitsLock = new ReentrantLock();
    private final Condition pendingCommits = this.pendingCommitsLock.newCondition();
    private final AtomicLong numProduced = new AtomicLong();
    private final AtomicBoolean waitForAllPendingToCommit = new AtomicBoolean();
    ConcurrentMap<String, JmsSourceRecord> pendingMessages = new ConcurrentHashMap();

    protected abstract ConnectionFactory connectionFactory();

    protected abstract C config(Map<String, String> map);

    public void start(Map<String, String> map) {
        Queue createTopic;
        LOG.trace("start()");
        this.config = config(map);
        this.converter = new RecordConverter(this.config);
        this.connectionFactory = connectionFactory();
        try {
            if (StringUtils.isBlank(this.config.username())) {
                LOG.info("Connecting with no username/password");
                this.connection = this.connectionFactory.createConnection();
            } else {
                LOG.info("Connecting as {}", this.config.username());
                this.connection = this.connectionFactory.createConnection(this.config.username(), this.config.password());
            }
            LOG.info("Starting connection.");
            this.connection.start();
            try {
                this.session = this.connection.createSession(false, ACKNOWLEDGE_MODE);
                LOG.info("Created session.");
                try {
                    String str = this.config.destinationType;
                    boolean z = -1;
                    switch (str.hashCode()) {
                        case 107944209:
                            if (str.equals("queue")) {
                                z = TRANSACTION_MODE;
                                break;
                            }
                            break;
                        case 110546223:
                            if (str.equals("topic")) {
                                z = true;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case TRANSACTION_MODE /* 0 */:
                            createTopic = this.session.createQueue(this.config.destinationName);
                            break;
                        case true:
                            createTopic = this.session.createTopic(this.config.destinationName);
                            break;
                        default:
                            throw new UnsupportedOperationException("Destination type of '" + this.config.destinationType + "' is not supported.");
                    }
                    try {
                        this.messageConsumer = this.session.createConsumer(createTopic);
                        LOG.info("Created consumer.");
                        this.batchSize = this.config.batchSize();
                        this.maxPending = this.config.maxPending();
                        this.maxPollDuration = this.config.maxPollDuration();
                    } catch (JMSException e) {
                        throw new ConnectException("Exception thrown while message consumer.", e);
                    }
                } catch (JMSException e2) {
                    throw new ConnectException("Exception thrown while creating destination.", e2);
                }
            } catch (JMSException e3) {
                throw new ConnectException("Exception thrown while creating connection.", e3);
            }
        } catch (JMSException e4) {
            throw new ConnectException("Exception thrown while creating connection.", e4);
        }
    }

    protected JmsSourceRecord receive(long j) {
        LOG.trace("receive()");
        try {
            Message receive = this.messageConsumer.receive(j);
            if (receive == null) {
                LOG.trace("No message received.");
                return null;
            }
            LOG.trace("Received message with id='{}'", receive.getJMSMessageID());
            return this.converter.record(receive);
        } catch (JMSException e) {
            throw new RetriableException("Exception thrown while receiving message.", e);
        }
    }

    public List<SourceRecord> poll() throws InterruptedException {
        LOG.trace("poll()");
        if (this.waitForAllPendingToCommit.get()) {
            this.pendingCommitsLock.lock();
            try {
                long j = this.numProduced.get();
                if (j > 0) {
                    long currentTimeMillis = System.currentTimeMillis();
                    this.pendingCommits.await(1L, TimeUnit.SECONDS);
                    LOG.trace("Waited {}ms for {} pending messages to be committed", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Long.valueOf(j));
                }
                if (this.numProduced.get() > 0) {
                    LOG.debug("Exceeded wait time for pending messages; returning null from poll");
                    return null;
                }
            } finally {
                this.pendingCommitsLock.unlock();
            }
        }
        ArrayList arrayList = TRANSACTION_MODE;
        long j2 = 5000;
        long currentTimeMillis2 = System.currentTimeMillis() + this.maxPollDuration;
        for (int i = TRANSACTION_MODE; i < this.batchSize; i++) {
            if (i != 0 && exceededPollDuration(currentTimeMillis2)) {
                LOG.debug("Exceeded poll duration with {}", Long.valueOf(j2));
                this.waitForAllPendingToCommit.set(true);
                return arrayList;
            }
            JmsSourceRecord receive = receive(j2);
            if (receive == null) {
                LOG.debug("Returning {} records after receive timeout", Integer.valueOf(arrayList == null ? TRANSACTION_MODE : arrayList.size()));
                this.waitForAllPendingToCommit.set(true);
                return arrayList;
            }
            if (arrayList == null) {
                arrayList = new ArrayList();
            }
            arrayList.add(receive);
            this.pendingMessages.put(receive.messageId, receive);
            j2 = 10;
            if (this.numProduced.incrementAndGet() >= this.maxPending) {
                LOG.debug("Returning after reaching max pending records {}", Integer.valueOf(this.maxPending));
                this.waitForAllPendingToCommit.set(true);
                return arrayList;
            }
        }
        return arrayList;
    }

    private boolean exceededPollDuration(long j) {
        return System.currentTimeMillis() > j;
    }

    public void commitRecord(SourceRecord sourceRecord) {
        LOG.trace("commitRecord()");
        this.pendingMessages.remove(((JmsSourceRecord) sourceRecord).messageId);
        if (this.waitForAllPendingToCommit.get() && this.pendingMessages.isEmpty()) {
            try {
                Message message = ((JmsSourceRecord) sourceRecord).message;
                message.acknowledge();
                LOG.trace("Acknowledged message with id='{}'", message.getJMSMessageID());
                this.pendingCommitsLock.lock();
                try {
                    this.numProduced.set(0L);
                    this.waitForAllPendingToCommit.set(false);
                    this.pendingCommits.signalAll();
                    this.pendingCommitsLock.unlock();
                } catch (Throwable th) {
                    this.pendingCommitsLock.unlock();
                    throw th;
                }
            } catch (JMSException e) {
                throw new RetriableException("Exception while calling message.acknowledge()", e);
            }
        }
    }

    public void stop() {
        LOG.trace("stop()");
        try {
            if (this.connection != null) {
                LOG.info("Closing JMS connection");
                this.connection.close();
            } else {
                LOG.info("JMS connection is null, skipping closing");
            }
        } catch (JMSException e) {
            LOG.error("Exception during close() on connection {}", this.connection, e);
        }
    }
}
