package net.timewalker.ffmq4.cluster.bridge;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.NamingException;
import net.timewalker.ffmq4.cluster.resolver.DestinationResolver;
import net.timewalker.ffmq4.cluster.resolver.SessionDestinationResolver;
import net.timewalker.ffmq4.management.bridge.BridgeDefinition;
import net.timewalker.ffmq4.management.peer.PeerDescriptor;
import net.timewalker.ffmq4.utils.JNDITools;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:net/timewalker/ffmq4/cluster/bridge/JMSBridge.class */
public final class JMSBridge implements JMSBridgeMBean {
    protected static final Log log = LogFactory.getLog(JMSBridge.class);
    protected BridgeDefinition bridgeDefinition;
    private JMSBridgeThread bridgeThread;
    protected volatile long forwardedMessages;
    protected volatile long failures;
    private boolean started;

    /* loaded from: input_file:net/timewalker/ffmq4/cluster/bridge/JMSBridge$JMSBridgeThread.class */
    private class JMSBridgeThread extends Thread {
        private DestinationResolver destinationResolver;
        private boolean stopRequired;
        private ConnectionFactory sourceConnectionFactory;
        private ConnectionFactory targetConnectionFactory;
        private Connection sourceConnection;
        private Session sourceSession;
        private MessageConsumer sourceConsumer;
        private Connection targetConnection;
        private Session targetSession;
        private MessageProducer targetProducer;
        private boolean debugEnabled;

        public JMSBridgeThread() {
            super("JMSBridge[" + JMSBridge.this.bridgeDefinition.getName() + "]");
            this.destinationResolver = new SessionDestinationResolver();
            this.debugEnabled = JMSBridge.log.isDebugEnabled();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                JMSBridge.log.debug("[" + JMSBridge.this.bridgeDefinition.getName() + "] JMS bridge thread starting");
                JMSBridge.log.trace(JMSBridge.this.bridgeDefinition);
                this.sourceConnectionFactory = getConnectionFactory(JMSBridge.this.bridgeDefinition.getSource());
                this.targetConnectionFactory = getConnectionFactory(JMSBridge.this.bridgeDefinition.getTarget());
                while (!this.stopRequired) {
                    Message receiveFromSource = receiveFromSource();
                    if (receiveFromSource == null) {
                        if (this.stopRequired) {
                            break;
                        }
                        JMSBridge.this.failures++;
                        JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Consumer was closed");
                        dropSourceResources();
                        dropTargetResources();
                    } else {
                        if (!forwardToTarget(receiveFromSource)) {
                            break;
                        }
                        try {
                            if (JMSBridge.this.bridgeDefinition.isCommitSourceFirst()) {
                                if (JMSBridge.this.bridgeDefinition.isConsumerTransacted()) {
                                    getSourceSession().commit();
                                } else if (JMSBridge.this.bridgeDefinition.getConsumerAcknowledgeMode() == 2) {
                                    receiveFromSource.acknowledge();
                                }
                                if (JMSBridge.this.bridgeDefinition.isProducerTransacted()) {
                                    getTargetSession().commit();
                                }
                            } else {
                                if (JMSBridge.this.bridgeDefinition.isProducerTransacted()) {
                                    getTargetSession().commit();
                                }
                                if (JMSBridge.this.bridgeDefinition.isConsumerTransacted()) {
                                    getSourceSession().commit();
                                } else if (JMSBridge.this.bridgeDefinition.getConsumerAcknowledgeMode() == 2) {
                                    receiveFromSource.acknowledge();
                                }
                            }
                            JMSBridge.this.forwardedMessages++;
                        } catch (JMSException e) {
                            JMSBridge.this.failures++;
                            JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Double phase commit failed", e);
                            dropSourceResources();
                            dropTargetResources();
                        }
                        if (this.debugEnabled) {
                            JMSBridge.log.debug("[" + JMSBridge.this.bridgeDefinition.getName() + "] Forwarded message : " + receiveFromSource);
                        }
                    }
                }
            } catch (Throwable th) {
                JMSBridge.log.fatal("[" + JMSBridge.this.bridgeDefinition.getName() + "] JMSBridge thread failed", th);
            } finally {
                dropSourceResources();
                dropTargetResources();
                JMSBridge.log.debug("[" + JMSBridge.this.bridgeDefinition.getName() + "] JMS bridge thread exiting");
            }
        }

        private Message receiveFromSource() {
            MessageConsumer sourceConsumer;
            while (!this.stopRequired) {
                try {
                    sourceConsumer = getSourceConsumer();
                } catch (JMSException e) {
                    JMSBridge.this.failures++;
                    JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Receive failed", e);
                    dropSourceResources();
                    retryWait();
                }
                if (sourceConsumer == null) {
                    return null;
                }
                Message receive = sourceConsumer.receive();
                if (receive != null) {
                    return receive;
                }
                if (this.stopRequired) {
                    return null;
                }
                JMSBridge.log.error("Consumer was unexpectedly closed, restarting bridge.");
                dropSourceResources();
                retryWait();
            }
            return null;
        }

        private boolean forwardToTarget(Message message) {
            while (!this.stopRequired) {
                try {
                    MessageProducer targetProducer = getTargetProducer();
                    if (targetProducer == null) {
                        return false;
                    }
                    long j = 0;
                    if (message.getJMSExpiration() > 0) {
                        long currentTimeMillis = System.currentTimeMillis();
                        if (currentTimeMillis >= message.getJMSExpiration()) {
                            JMSBridge.log.warn("Message " + message.getJMSMessageID() + " has expired, discarding it.");
                            return true;
                        }
                        j = message.getJMSExpiration() - currentTimeMillis;
                    }
                    targetProducer.send(message, JMSBridge.this.bridgeDefinition.getProducerDeliveryMode(), message.getJMSPriority(), j);
                    return true;
                } catch (JMSException e) {
                    JMSBridge.this.failures++;
                    JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Send failed", e);
                    dropTargetResources();
                    retryWait();
                }
            }
            return false;
        }

        public synchronized void pleaseStop() {
            this.stopRequired = true;
            notify();
            dropSourceResources();
        }

        private ConnectionFactory getConnectionFactory(PeerDescriptor peerDescriptor) throws JMSException {
            try {
                return (ConnectionFactory) JNDITools.getContext(peerDescriptor.getJdniInitialContextFactoryName(), peerDescriptor.getProviderURL(), null).lookup(peerDescriptor.getJndiConnectionFactoryName());
            } catch (NamingException e) {
                throw new JMSException("JNDI error : " + e.toString());
            }
        }

        private synchronized MessageConsumer getSourceConsumer() {
            if (this.sourceConsumer == null) {
                while (!this.stopRequired) {
                    try {
                        Session sourceSession = getSourceSession();
                        if (sourceSession != null) {
                            this.sourceConsumer = sourceSession.createConsumer(this.destinationResolver.getDestination(JMSBridge.this.bridgeDefinition.getSource(), JMSBridge.this.bridgeDefinition.getSourceDestination(), sourceSession));
                            getSourceConnection().start();
                            break;
                        }
                        break;
                    } catch (JMSException e) {
                        JMSBridge.this.failures++;
                        JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create consumer on source queuer", e);
                        dropSourceResources();
                        retryWait();
                    }
                }
            }
            return this.sourceConsumer;
        }

        private synchronized MessageProducer getTargetProducer() {
            if (this.targetProducer == null) {
                while (!this.stopRequired) {
                    try {
                        Session targetSession = getTargetSession();
                        if (targetSession != null) {
                            this.targetProducer = targetSession.createProducer(this.destinationResolver.getDestination(JMSBridge.this.bridgeDefinition.getTarget(), JMSBridge.this.bridgeDefinition.getTargetDestination(), targetSession));
                            break;
                        }
                        break;
                    } catch (JMSException e) {
                        JMSBridge.this.failures++;
                        JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create producer on target queuer", e);
                        dropTargetResources();
                        retryWait();
                    }
                }
            }
            return this.targetProducer;
        }

        private synchronized Session getTargetSession() {
            if (this.targetSession == null) {
                while (!this.stopRequired) {
                    try {
                        Connection targetConnection = getTargetConnection();
                        if (targetConnection != null) {
                            this.targetSession = targetConnection.createSession(JMSBridge.this.bridgeDefinition.isProducerTransacted(), JMSBridge.this.bridgeDefinition.isProducerTransacted() ? 0 : 1);
                            break;
                        }
                        break;
                    } catch (JMSException e) {
                        JMSBridge.this.failures++;
                        JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create session on target queuer", e);
                        dropTargetResources();
                        retryWait();
                    }
                }
            }
            return this.targetSession;
        }

        private synchronized Session getSourceSession() {
            if (this.sourceSession == null) {
                while (!this.stopRequired) {
                    try {
                        Connection sourceConnection = getSourceConnection();
                        if (sourceConnection != null) {
                            this.sourceSession = sourceConnection.createSession(JMSBridge.this.bridgeDefinition.isConsumerTransacted(), JMSBridge.this.bridgeDefinition.isConsumerTransacted() ? 0 : JMSBridge.this.bridgeDefinition.getConsumerAcknowledgeMode());
                            break;
                        }
                        break;
                    } catch (JMSException e) {
                        JMSBridge.this.failures++;
                        JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create session on source queuer", e);
                        dropSourceResources();
                        retryWait();
                    }
                }
            }
            return this.sourceSession;
        }

        private synchronized Connection getTargetConnection() {
            if (this.targetConnection == null) {
                while (!this.stopRequired) {
                    try {
                        this.targetConnection = this.targetConnectionFactory.createConnection(JMSBridge.this.bridgeDefinition.getTarget().getUserName(), JMSBridge.this.bridgeDefinition.getTarget().getPassword());
                        break;
                    } catch (JMSException e) {
                        JMSBridge.this.failures++;
                        JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create connection to target queuer", e);
                        dropTargetResources();
                        retryWait();
                    }
                }
            }
            return this.targetConnection;
        }

        private synchronized Connection getSourceConnection() {
            if (this.sourceConnection == null) {
                while (!this.stopRequired) {
                    try {
                        this.sourceConnection = this.sourceConnectionFactory.createConnection(JMSBridge.this.bridgeDefinition.getSource().getUserName(), JMSBridge.this.bridgeDefinition.getSource().getPassword());
                        break;
                    } catch (JMSException e) {
                        JMSBridge.this.failures++;
                        JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Cannot create connection to source queuer", e);
                        dropSourceResources();
                        retryWait();
                    }
                }
            }
            return this.sourceConnection;
        }

        private synchronized void retryWait() {
            if (this.stopRequired) {
                return;
            }
            JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Waiting " + JMSBridge.this.bridgeDefinition.getRetryInterval() + " second(s) before retrying");
            try {
                wait(JMSBridge.this.bridgeDefinition.getRetryInterval() * 1000);
            } catch (InterruptedException e) {
                JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Retry wait was interrupted");
            }
        }

        private synchronized void dropSourceResources() {
            try {
                if (this.sourceConsumer != null) {
                    this.sourceConsumer.close();
                }
            } catch (Exception e) {
                JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close source consumer", e);
            } finally {
                this.sourceConsumer = null;
            }
            try {
                if (this.sourceSession != null) {
                    this.sourceSession.close();
                }
            } catch (Exception e2) {
                JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close source session", e2);
            } finally {
                this.sourceSession = null;
            }
            try {
                try {
                    if (this.sourceConnection != null) {
                        this.sourceConnection.close();
                    }
                    this.sourceConnection = null;
                } catch (Exception e3) {
                    JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close source connection", e3);
                    this.sourceConnection = null;
                }
            } catch (Throwable th) {
                this.sourceConnection = null;
                throw th;
            }
        }

        private synchronized void dropTargetResources() {
            try {
                if (this.targetProducer != null) {
                    this.targetProducer.close();
                }
            } catch (Exception e) {
                JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close target producer", e);
            } finally {
                this.targetProducer = null;
            }
            try {
                if (this.targetSession != null) {
                    this.targetSession.close();
                }
            } catch (Exception e2) {
                JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close target session", e2);
            } finally {
                this.targetSession = null;
            }
            try {
                try {
                    if (this.targetConnection != null) {
                        this.targetConnection.close();
                    }
                    this.targetConnection = null;
                } catch (Exception e3) {
                    JMSBridge.log.error("[" + JMSBridge.this.bridgeDefinition.getName() + "] Could not close target connection", e3);
                    this.targetConnection = null;
                }
            } catch (Throwable th) {
                this.targetConnection = null;
                throw th;
            }
        }
    }

    public JMSBridge(BridgeDefinition bridgeDefinition) {
        this.bridgeDefinition = bridgeDefinition;
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeMBean
    public String getName() {
        return this.bridgeDefinition.getName();
    }

    public BridgeDefinition getBridgeDefinition() {
        return this.bridgeDefinition;
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeMBean
    public long getForwardedMessages() {
        return this.forwardedMessages;
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeMBean
    public long getFailures() {
        return this.failures;
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeMBean
    public void resetStats() {
        this.forwardedMessages = 0L;
        this.failures = 0L;
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeMBean
    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.bridgeThread = new JMSBridgeThread();
        this.bridgeThread.start();
        log.info("[" + this.bridgeDefinition.getName() + "] JMS bridge started");
        this.started = true;
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeMBean
    public synchronized void stop() {
        if (this.started) {
            this.bridgeThread.pleaseStop();
            try {
                this.bridgeThread.join();
            } catch (InterruptedException e) {
                log.error("Wait for bridge thread completion was interrupted");
            } finally {
                this.bridgeThread = null;
            }
            log.info("[" + this.bridgeDefinition.getName() + "] JMS bridge stopped.");
            this.started = false;
        }
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeMBean
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeDefinitionMBean
    public int getRetryInterval() {
        return this.bridgeDefinition.getRetryInterval();
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeDefinitionMBean
    public boolean isCommitSourceFirst() {
        return this.bridgeDefinition.isCommitSourceFirst();
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeDefinitionMBean
    public boolean isProducerTransacted() {
        return this.bridgeDefinition.isProducerTransacted();
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeDefinitionMBean
    public boolean isConsumerTransacted() {
        return this.bridgeDefinition.isConsumerTransacted();
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeDefinitionMBean
    public int getConsumerAcknowledgeMode() {
        return this.bridgeDefinition.getConsumerAcknowledgeMode();
    }

    @Override // net.timewalker.ffmq4.cluster.bridge.JMSBridgeDefinitionMBean
    public int getProducerDeliveryMode() {
        return this.bridgeDefinition.getProducerDeliveryMode();
    }
}
