package org.apache.activemq.ra;

import java.lang.reflect.Method;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
import javax.resource.spi.work.WorkManager;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:WEB-INF/lib/activemq-ra-4.1.1.jar:org/apache/activemq/ra/ActiveMQEndpointWorker.class */
public class ActiveMQEndpointWorker {
    public static final Method ON_MESSAGE_METHOD;
    private static final long INITIAL_RECONNECT_DELAY = 1000;
    private static final long MAX_RECONNECT_DELAY = 30000;
    protected final ActiveMQEndpointActivationKey endpointActivationKey;
    protected final MessageEndpointFactory endpointFactory;
    protected final WorkManager workManager;
    protected final boolean transacted;
    private final ActiveMQDestination dest;
    private final Work connectWork;
    private ConnectionConsumer consumer;
    private ServerSessionPoolImpl serverSessionPool;
    private boolean running;
    private ActiveMQConnection connection;
    private long reconnectDelay = 1000;
    private static final Log log = LogFactory.getLog(ActiveMQEndpointWorker.class);
    private static final ThreadLocal threadLocal = new ThreadLocal();

    public static void safeClose(Session session) {
        if (session != null) {
            try {
                session.close();
            } catch (JMSException e) {
            }
        }
    }

    private static void safeClose(Connection connection) {
        if (connection != null) {
            try {
                connection.close();
            } catch (JMSException e) {
            }
        }
    }

    private static void safeClose(ConnectionConsumer connectionConsumer) {
        if (connectionConsumer != null) {
            try {
                connectionConsumer.close();
            } catch (JMSException e) {
            }
        }
    }

    public ActiveMQEndpointWorker(final ActiveMQResourceAdapter activeMQResourceAdapter, ActiveMQEndpointActivationKey activeMQEndpointActivationKey) throws ResourceException {
        this.endpointActivationKey = activeMQEndpointActivationKey;
        this.endpointFactory = this.endpointActivationKey.getMessageEndpointFactory();
        this.workManager = activeMQResourceAdapter.getBootstrapContext().getWorkManager();
        try {
            this.transacted = this.endpointFactory.isDeliveryTransacted(ON_MESSAGE_METHOD);
            this.connectWork = new Work() { // from class: org.apache.activemq.ra.ActiveMQEndpointWorker.1
                @Override // javax.resource.spi.work.Work
                public void release() {
                }

                @Override // java.lang.Runnable
                public synchronized void run() {
                    if (ActiveMQEndpointWorker.this.running && ActiveMQEndpointWorker.this.connection == null) {
                        ActiveMQActivationSpec activationSpec = ActiveMQEndpointWorker.this.endpointActivationKey.getActivationSpec();
                        try {
                            ActiveMQEndpointWorker.this.connection = activeMQResourceAdapter.makeConnection(activationSpec);
                            ActiveMQEndpointWorker.this.connection.start();
                            ActiveMQEndpointWorker.this.connection.setExceptionListener(new ExceptionListener() { // from class: org.apache.activemq.ra.ActiveMQEndpointWorker.1.1
                                @Override // javax.jms.ExceptionListener
                                public void onException(JMSException jMSException) {
                                    if (ActiveMQEndpointWorker.this.serverSessionPool.isClosing()) {
                                        return;
                                    }
                                    ActiveMQEndpointWorker.this.reconnect(jMSException);
                                }
                            });
                            int maxMessagesPerSessionsIntValue = activationSpec.getMaxMessagesPerSessionsIntValue() * activationSpec.getMaxSessionsIntValue();
                            if (activationSpec.isDurableSubscription()) {
                                ActiveMQEndpointWorker.this.consumer = ActiveMQEndpointWorker.this.connection.createDurableConnectionConsumer((Topic) ActiveMQEndpointWorker.this.dest, activationSpec.getSubscriptionName(), ActiveMQEndpointWorker.this.emptyToNull(activationSpec.getMessageSelector()), ActiveMQEndpointWorker.this.serverSessionPool, maxMessagesPerSessionsIntValue, activationSpec.getNoLocalBooleanValue());
                            } else {
                                ActiveMQEndpointWorker.this.consumer = ActiveMQEndpointWorker.this.connection.createConnectionConsumer(ActiveMQEndpointWorker.this.dest, ActiveMQEndpointWorker.this.emptyToNull(activationSpec.getMessageSelector()), ActiveMQEndpointWorker.this.serverSessionPool, maxMessagesPerSessionsIntValue, activationSpec.getNoLocalBooleanValue());
                            }
                        } catch (JMSException e) {
                            ActiveMQEndpointWorker.log.debug("Fail to to connect: " + e, e);
                            ActiveMQEndpointWorker.this.reconnect(e);
                        }
                    }
                }
            };
            ActiveMQActivationSpec activationSpec = this.endpointActivationKey.getActivationSpec();
            if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
                this.dest = new ActiveMQQueue(activationSpec.getDestination());
            } else {
                if (!"javax.jms.Topic".equals(activationSpec.getDestinationType())) {
                    throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
                }
                this.dest = new ActiveMQTopic(activationSpec.getDestination());
            }
        } catch (NoSuchMethodException e) {
            throw new ResourceException("Endpoint does not implement the onMessage method.");
        }
    }

    public void start() throws ResourceException {
        synchronized (this.connectWork) {
            if (this.running) {
                return;
            }
            this.running = true;
            log.debug("Starting");
            this.serverSessionPool = new ServerSessionPoolImpl(this, this.endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
            connect();
            log.debug("Started");
        }
    }

    public void stop() throws InterruptedException {
        synchronized (this.connectWork) {
            if (this.running) {
                this.running = false;
                this.serverSessionPool.close();
                disconnect();
            }
        }
    }

    private void connect() {
        if (this.running) {
            try {
                this.workManager.scheduleWork(this.connectWork, WorkManager.INDEFINITE, null, null);
            } catch (WorkException e) {
                this.running = false;
                log.error("Work Manager did not accept work: ", e);
            }
        }
    }

    private void disconnect() {
        safeClose(this.consumer);
        this.consumer = null;
        safeClose(this.connection);
        this.connection = null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void reconnect(JMSException jMSException) {
        long j;
        log.debug("Reconnect cause: ", jMSException);
        try {
            synchronized (this.connectWork) {
                j = this.reconnectDelay;
                if (j == MAX_RECONNECT_DELAY) {
                    log.error("Endpoint connection to JMS broker failed: " + jMSException.getMessage());
                    log.error("Endpoint will try to reconnect to the JMS broker in 30 seconds");
                }
                disconnect();
            }
            Thread.sleep(j);
            synchronized (this.connectWork) {
                this.reconnectDelay *= 2;
                if (this.reconnectDelay > MAX_RECONNECT_DELAY) {
                    this.reconnectDelay = MAX_RECONNECT_DELAY;
                }
                connect();
            }
        } catch (InterruptedException e) {
            Thread.interrupted();
        }
    }

    protected void registerThreadSession(Session session) {
        threadLocal.set(session);
    }

    protected void unregisterThreadSession(Session session) {
        threadLocal.set(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ActiveMQConnection getConnection() {
        ActiveMQConnection activeMQConnection;
        synchronized (this.connectWork) {
            activeMQConnection = this.connection;
        }
        return activeMQConnection;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String emptyToNull(String str) {
        if (str == null || str.length() == 0) {
            return null;
        }
        return str;
    }

    static {
        try {
            ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", Message.class);
        } catch (Exception e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
