package net.timewalker.ffmq3.cluster.bridge;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.NamingException;
import net.timewalker.ffmq3.cluster.resolver.DestinationResolver;
import net.timewalker.ffmq3.cluster.resolver.SessionDestinationResolver;
import net.timewalker.ffmq3.management.bridge.BridgeDefinition;
import net.timewalker.ffmq3.management.peer.PeerDescriptor;
import net.timewalker.ffmq3.utils.JNDITools;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:net/timewalker/ffmq3/cluster/bridge/JMSBridge.class */
public final class JMSBridge implements JMSBridgeMBean {
    protected static final Log log;
    protected BridgeDefinition bridgeDefinition;
    private JMSBridgeThread bridgeThread;
    protected volatile long forwardedMessages;
    protected volatile long failures;
    private boolean started;
    static Class class$net$timewalker$ffmq3$cluster$bridge$JMSBridge;

    /* loaded from: input_file:net/timewalker/ffmq3/cluster/bridge/JMSBridge$JMSBridgeThread.class */
    private class JMSBridgeThread extends Thread {
        private DestinationResolver destinationResolver;
        private boolean stopRequired;
        private ConnectionFactory sourceConnectionFactory;
        private ConnectionFactory targetConnectionFactory;
        private Connection sourceConnection;
        private Session sourceSession;
        private MessageConsumer sourceConsumer;
        private Connection targetConnection;
        private Session targetSession;
        private MessageProducer targetProducer;
        private boolean debugEnabled;
        private final JMSBridge this$0;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public JMSBridgeThread(JMSBridge jMSBridge) {
            super(new StringBuffer().append("JMSBridge[").append(jMSBridge.bridgeDefinition.getName()).append("]").toString());
            this.this$0 = jMSBridge;
            this.destinationResolver = new SessionDestinationResolver();
            this.debugEnabled = JMSBridge.log.isDebugEnabled();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                JMSBridge.log.debug(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] JMS bridge thread starting").toString());
                JMSBridge.log.trace(this.this$0.bridgeDefinition);
                this.sourceConnectionFactory = getConnectionFactory(this.this$0.bridgeDefinition.getSource());
                this.targetConnectionFactory = getConnectionFactory(this.this$0.bridgeDefinition.getTarget());
                while (!this.stopRequired) {
                    Message receiveFromSource = receiveFromSource();
                    if (receiveFromSource == null) {
                        if (this.stopRequired) {
                            break;
                        }
                        this.this$0.failures++;
                        JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Consumer was closed").toString());
                        dropSourceResources();
                        dropTargetResources();
                    } else {
                        if (!forwardToTarget(receiveFromSource)) {
                            break;
                        }
                        try {
                            if (this.this$0.bridgeDefinition.isCommitSourceFirst()) {
                                if (this.this$0.bridgeDefinition.isConsumerTransacted()) {
                                    getSourceSession().commit();
                                } else if (this.this$0.bridgeDefinition.getConsumerAcknowledgeMode() == 2) {
                                    receiveFromSource.acknowledge();
                                }
                                if (this.this$0.bridgeDefinition.isProducerTransacted()) {
                                    getTargetSession().commit();
                                }
                            } else {
                                if (this.this$0.bridgeDefinition.isProducerTransacted()) {
                                    getTargetSession().commit();
                                }
                                if (this.this$0.bridgeDefinition.isConsumerTransacted()) {
                                    getSourceSession().commit();
                                } else if (this.this$0.bridgeDefinition.getConsumerAcknowledgeMode() == 2) {
                                    receiveFromSource.acknowledge();
                                }
                            }
                            this.this$0.forwardedMessages++;
                        } catch (JMSException e) {
                            this.this$0.failures++;
                            JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Double phase commit failed").toString(), e);
                            dropSourceResources();
                            dropTargetResources();
                        }
                        if (this.debugEnabled) {
                            JMSBridge.log.debug(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Forwarded message : ").append(receiveFromSource).toString());
                        }
                    }
                }
            } catch (Throwable th) {
                JMSBridge.log.fatal(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] JMSBridge thread failed").toString(), th);
            } finally {
                dropSourceResources();
                dropTargetResources();
                JMSBridge.log.debug(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] JMS bridge thread exiting").toString());
            }
        }

        private Message receiveFromSource() {
            MessageConsumer sourceConsumer;
            while (!this.stopRequired) {
                try {
                    sourceConsumer = getSourceConsumer();
                } catch (JMSException e) {
                    this.this$0.failures++;
                    JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Receive failed").toString(), e);
                    dropSourceResources();
                    retryWait();
                }
                if (sourceConsumer == null) {
                    return null;
                }
                Message receive = sourceConsumer.receive();
                if (receive != null) {
                    return receive;
                }
                if (this.stopRequired) {
                    return null;
                }
                JMSBridge.log.error("Consumer was unexpectedly closed, restarting bridge.");
                dropSourceResources();
                retryWait();
            }
            return null;
        }

        private boolean forwardToTarget(Message message) {
            while (!this.stopRequired) {
                try {
                    MessageProducer targetProducer = getTargetProducer();
                    if (targetProducer == null) {
                        return false;
                    }
                    long j = 0;
                    if (message.getJMSExpiration() > 0) {
                        long currentTimeMillis = System.currentTimeMillis();
                        if (currentTimeMillis >= message.getJMSExpiration()) {
                            JMSBridge.log.warn(new StringBuffer().append("Message ").append(message.getJMSMessageID()).append(" has expired, discarding it.").toString());
                            return true;
                        }
                        j = message.getJMSExpiration() - currentTimeMillis;
                    }
                    targetProducer.send(message, this.this$0.bridgeDefinition.getProducerDeliveryMode(), message.getJMSPriority(), j);
                    return true;
                } catch (JMSException e) {
                    this.this$0.failures++;
                    JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Send failed").toString(), e);
                    dropTargetResources();
                    retryWait();
                }
            }
            return false;
        }

        public synchronized void pleaseStop() {
            this.stopRequired = true;
            notify();
            dropSourceResources();
        }

        private ConnectionFactory getConnectionFactory(PeerDescriptor peerDescriptor) throws JMSException {
            try {
                return (ConnectionFactory) JNDITools.getContext(peerDescriptor.getJdniInitialContextFactoryName(), peerDescriptor.getProviderURL(), null).lookup(peerDescriptor.getJndiConnectionFactoryName());
            } catch (NamingException e) {
                throw new JMSException(new StringBuffer().append("JNDI error : ").append(e.toString()).toString());
            }
        }

        private synchronized MessageConsumer getSourceConsumer() {
            if (this.sourceConsumer == null) {
                while (!this.stopRequired) {
                    try {
                        Session sourceSession = getSourceSession();
                        if (sourceSession != null) {
                            this.sourceConsumer = sourceSession.createConsumer(this.destinationResolver.getDestination(this.this$0.bridgeDefinition.getSource(), this.this$0.bridgeDefinition.getSourceDestination(), sourceSession));
                            getSourceConnection().start();
                            break;
                        }
                        break;
                    } catch (JMSException e) {
                        this.this$0.failures++;
                        JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Cannot create consumer on source queuer").toString(), e);
                        dropSourceResources();
                        retryWait();
                    }
                }
            }
            return this.sourceConsumer;
        }

        private synchronized MessageProducer getTargetProducer() {
            if (this.targetProducer == null) {
                while (!this.stopRequired) {
                    try {
                        Session targetSession = getTargetSession();
                        if (targetSession != null) {
                            this.targetProducer = targetSession.createProducer(this.destinationResolver.getDestination(this.this$0.bridgeDefinition.getTarget(), this.this$0.bridgeDefinition.getTargetDestination(), targetSession));
                            break;
                        }
                        break;
                    } catch (JMSException e) {
                        this.this$0.failures++;
                        JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Cannot create producer on target queuer").toString(), e);
                        dropTargetResources();
                        retryWait();
                    }
                }
            }
            return this.targetProducer;
        }

        private synchronized Session getTargetSession() {
            if (this.targetSession == null) {
                while (!this.stopRequired) {
                    try {
                        Connection targetConnection = getTargetConnection();
                        if (targetConnection != null) {
                            this.targetSession = targetConnection.createSession(this.this$0.bridgeDefinition.isProducerTransacted(), this.this$0.bridgeDefinition.isProducerTransacted() ? 0 : 1);
                            break;
                        }
                        break;
                    } catch (JMSException e) {
                        this.this$0.failures++;
                        JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Cannot create session on target queuer").toString(), e);
                        dropTargetResources();
                        retryWait();
                    }
                }
            }
            return this.targetSession;
        }

        private synchronized Session getSourceSession() {
            if (this.sourceSession == null) {
                while (!this.stopRequired) {
                    try {
                        Connection sourceConnection = getSourceConnection();
                        if (sourceConnection != null) {
                            this.sourceSession = sourceConnection.createSession(this.this$0.bridgeDefinition.isConsumerTransacted(), this.this$0.bridgeDefinition.isConsumerTransacted() ? 0 : this.this$0.bridgeDefinition.getConsumerAcknowledgeMode());
                            break;
                        }
                        break;
                    } catch (JMSException e) {
                        this.this$0.failures++;
                        JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Cannot create session on source queuer").toString(), e);
                        dropSourceResources();
                        retryWait();
                    }
                }
            }
            return this.sourceSession;
        }

        private synchronized Connection getTargetConnection() {
            if (this.targetConnection == null) {
                while (!this.stopRequired) {
                    try {
                        this.targetConnection = this.targetConnectionFactory.createConnection(this.this$0.bridgeDefinition.getTarget().getUserName(), this.this$0.bridgeDefinition.getTarget().getPassword());
                        break;
                    } catch (JMSException e) {
                        this.this$0.failures++;
                        JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Cannot create connection to target queuer").toString(), e);
                        dropTargetResources();
                        retryWait();
                    }
                }
            }
            return this.targetConnection;
        }

        private synchronized Connection getSourceConnection() {
            if (this.sourceConnection == null) {
                while (!this.stopRequired) {
                    try {
                        this.sourceConnection = this.sourceConnectionFactory.createConnection(this.this$0.bridgeDefinition.getSource().getUserName(), this.this$0.bridgeDefinition.getSource().getPassword());
                        break;
                    } catch (JMSException e) {
                        this.this$0.failures++;
                        JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Cannot create connection to source queuer").toString(), e);
                        dropSourceResources();
                        retryWait();
                    }
                }
            }
            return this.sourceConnection;
        }

        private synchronized void retryWait() {
            if (this.stopRequired) {
                return;
            }
            JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Waiting ").append(this.this$0.bridgeDefinition.getRetryInterval()).append(" second(s) before retrying").toString());
            try {
                wait(this.this$0.bridgeDefinition.getRetryInterval() * 1000);
            } catch (InterruptedException e) {
                JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Retry wait was interrupted").toString());
            }
        }

        private synchronized void dropSourceResources() {
            try {
                if (this.sourceConsumer != null) {
                    this.sourceConsumer.close();
                }
            } catch (Exception e) {
                JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Could not close source consumer").toString(), e);
            } finally {
                this.sourceConsumer = null;
            }
            try {
                if (this.sourceSession != null) {
                    this.sourceSession.close();
                }
            } catch (Exception e2) {
                JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Could not close source session").toString(), e2);
            } finally {
                this.sourceSession = null;
            }
            try {
                try {
                    if (this.sourceConnection != null) {
                        this.sourceConnection.close();
                    }
                    this.sourceConnection = null;
                } catch (Exception e3) {
                    JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Could not close source connection").toString(), e3);
                    this.sourceConnection = null;
                }
            } catch (Throwable th) {
                this.sourceConnection = null;
                throw th;
            }
        }

        private synchronized void dropTargetResources() {
            try {
                if (this.targetProducer != null) {
                    this.targetProducer.close();
                }
            } catch (Exception e) {
                JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Could not close target producer").toString(), e);
            } finally {
                this.targetProducer = null;
            }
            try {
                if (this.targetSession != null) {
                    this.targetSession.close();
                }
            } catch (Exception e2) {
                JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Could not close target session").toString(), e2);
            } finally {
                this.targetSession = null;
            }
            try {
                try {
                    if (this.targetConnection != null) {
                        this.targetConnection.close();
                    }
                    this.targetConnection = null;
                } catch (Exception e3) {
                    JMSBridge.log.error(new StringBuffer().append("[").append(this.this$0.bridgeDefinition.getName()).append("] Could not close target connection").toString(), e3);
                    this.targetConnection = null;
                }
            } catch (Throwable th) {
                this.targetConnection = null;
                throw th;
            }
        }
    }

    public JMSBridge(BridgeDefinition bridgeDefinition) {
        this.bridgeDefinition = bridgeDefinition;
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeMBean
    public String getName() {
        return this.bridgeDefinition.getName();
    }

    public BridgeDefinition getBridgeDefinition() {
        return this.bridgeDefinition;
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeMBean
    public long getForwardedMessages() {
        return this.forwardedMessages;
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeMBean
    public long getFailures() {
        return this.failures;
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeMBean
    public void resetStats() {
        this.forwardedMessages = 0L;
        this.failures = 0L;
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeMBean
    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.bridgeThread = new JMSBridgeThread(this);
        this.bridgeThread.start();
        log.info(new StringBuffer().append("[").append(this.bridgeDefinition.getName()).append("] JMS bridge started").toString());
        this.started = true;
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeMBean
    public synchronized void stop() {
        if (this.started) {
            this.bridgeThread.pleaseStop();
            try {
                this.bridgeThread.join();
            } catch (InterruptedException e) {
                log.error("Wait for bridge thread completion was interrupted");
            } finally {
                this.bridgeThread = null;
            }
            log.info(new StringBuffer().append("[").append(this.bridgeDefinition.getName()).append("] JMS bridge stopped.").toString());
            this.started = false;
        }
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeMBean
    public synchronized boolean isStarted() {
        return this.started;
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeDefinitionMBean
    public int getRetryInterval() {
        return this.bridgeDefinition.getRetryInterval();
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeDefinitionMBean
    public boolean isCommitSourceFirst() {
        return this.bridgeDefinition.isCommitSourceFirst();
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeDefinitionMBean
    public boolean isProducerTransacted() {
        return this.bridgeDefinition.isProducerTransacted();
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeDefinitionMBean
    public boolean isConsumerTransacted() {
        return this.bridgeDefinition.isConsumerTransacted();
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeDefinitionMBean
    public int getConsumerAcknowledgeMode() {
        return this.bridgeDefinition.getConsumerAcknowledgeMode();
    }

    @Override // net.timewalker.ffmq3.cluster.bridge.JMSBridgeDefinitionMBean
    public int getProducerDeliveryMode() {
        return this.bridgeDefinition.getProducerDeliveryMode();
    }

    static Class class$(String str) {
        try {
            return Class.forName(str);
        } catch (ClassNotFoundException e) {
            throw new NoClassDefFoundError().initCause(e);
        }
    }

    static {
        Class cls;
        if (class$net$timewalker$ffmq3$cluster$bridge$JMSBridge == null) {
            cls = class$("net.timewalker.ffmq3.cluster.bridge.JMSBridge");
            class$net$timewalker$ffmq3$cluster$bridge$JMSBridge = cls;
        } else {
            cls = class$net$timewalker$ffmq3$cluster$bridge$JMSBridge;
        }
        log = LogFactory.getLog(cls);
    }
}
