package com.oracle.coherence.patterns.eventdistribution.distributors.jms;

import com.oracle.coherence.common.events.Event;
import com.oracle.coherence.common.tuples.Pair;
import com.oracle.coherence.patterns.eventdistribution.EventChannelController;
import com.oracle.coherence.patterns.eventdistribution.EventChannelNotReadyException;
import com.oracle.coherence.patterns.eventdistribution.EventDistributor;
import com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController;
import com.oracle.coherence.patterns.messaging.Subscription;
import com.tangosol.coherence.config.ParameterList;
import com.tangosol.coherence.config.builder.ParameterizedBuilder;
import com.tangosol.config.expression.ParameterResolver;
import com.tangosol.io.Serializer;
import com.tangosol.net.CacheFactory;
import com.tangosol.util.Binary;
import com.tangosol.util.processor.UpdaterProcessor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

/* loaded from: input_file:com/oracle/coherence/patterns/eventdistribution/distributors/jms/JMSEventChannelController.class */
public class JMSEventChannelController extends AbstractEventChannelController<Message> implements JMSEventChannelControllerMBean {
    private static Logger logger = Logger.getLogger(JMSEventChannelController.class.getName());
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private TopicSubscriber subscriber;

    public JMSEventChannelController(EventDistributor.Identifier identifier, EventChannelController.Identifier identifier2, EventChannelController.Dependencies dependencies, ClassLoader classLoader, ParameterResolver parameterResolver, ParameterizedBuilder<Serializer> parameterizedBuilder, ParameterizedBuilder<ConnectionFactory> parameterizedBuilder2) {
        super(identifier, identifier2, dependencies, classLoader, parameterResolver, parameterizedBuilder);
        this.connection = null;
        this.session = null;
        this.subscriber = null;
        this.connectionFactory = (ConnectionFactory) parameterizedBuilder2.realize(parameterResolver, classLoader, (ParameterList) null);
    }

    private void ensureConnected() throws JMSException {
        if (this.connection == null) {
            this.connection = this.connectionFactory.createConnection();
            this.connection.setClientID(this.controllerIdentifier.getExternalName());
            this.connection.start();
        }
        if (this.session == null) {
            this.session = this.connection.createSession(false, 2);
        }
        if (this.subscriber == null) {
            if (logger.isLoggable(Level.FINE)) {
                logger.log(Level.FINE, "Attempting to create topic {0} for {1}", new Object[]{this.distributorIdentifier.getExternalName(), this.distributorIdentifier});
            }
            Topic createTopic = this.session.createTopic(this.distributorIdentifier.getExternalName());
            if (logger.isLoggable(Level.FINE)) {
                logger.log(Level.FINE, "Attempting to create durable subscription for {0} in {1}", new Object[]{this.controllerIdentifier, this.distributorIdentifier});
            }
            this.subscriber = this.session.createDurableSubscriber(createTopic, this.controllerIdentifier.getExternalName(), String.format("symbolicname = '%s'", this.distributorIdentifier.getSymbolicName()), false);
        }
    }

    private void ensureDisconnected() {
        if (this.subscriber != null) {
            try {
                this.subscriber.close();
            } catch (JMSException e) {
            } finally {
                this.subscriber = null;
            }
        }
        if (this.session != null) {
            try {
                this.session.close();
            } catch (JMSException e2) {
            } finally {
                this.session = null;
            }
        }
        if (this.connection != null) {
            try {
                this.connection.stop();
                this.connection.close();
                this.connection = null;
            } catch (JMSException e3) {
                this.connection = null;
            } catch (Throwable th) {
                this.connection = null;
                throw th;
            }
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalEnable() {
        try {
            ensureConnected();
        } catch (JMSException e) {
            logger.log(Level.SEVERE, "Failed to enable the EventChannelController {0} due to {1}.", new Object[]{this.controllerDependencies.getChannelName(), e});
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalDisable() {
        try {
            ensureConnected();
            if (this.subscriber != null) {
                try {
                    this.subscriber.close();
                    this.subscriber = null;
                } catch (JMSException e) {
                    this.subscriber = null;
                } catch (Throwable th) {
                    this.subscriber = null;
                    throw th;
                }
            }
            this.session.unsubscribe(this.controllerIdentifier.getExternalName());
            ensureDisconnected();
        } catch (JMSException e2) {
            logger.log(Level.SEVERE, "Failed to disable the EventChannelController {0} due to {1}.", new Object[]{this.controllerDependencies.getChannelName(), e2});
            throw new RuntimeException((Throwable) e2);
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalDrain() {
        internalDisable();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalStart() throws EventChannelNotReadyException {
        try {
            ensureConnected();
            this.channel.connect(this.distributorIdentifier, this.controllerIdentifier);
        } catch (JMSException e) {
            if (logger.isLoggable(Level.FINER)) {
                logger.log(Level.FINER, "The EventChannelController {0} for {1} could not establish subscription {2} to the JMS infrastructure.  Deferring (re)start for {3} ms", new Object[]{this.controllerDependencies.getChannelName(), this.distributorIdentifier, this.controllerIdentifier.getExternalName(), Long.valueOf(this.controllerDependencies.getRestartDelay())});
                logger.log(Level.FINER, "Exception was:", e);
            }
            ensureDisconnected();
            throw new EventChannelNotReadyException(this.channel);
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected void internalStop() {
        ensureDisconnected();
        this.channel.disconnect();
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected Pair<List<Event>, Message> getEventsToDistribute() {
        try {
            ArrayList arrayList = new ArrayList(this.controllerDependencies.getBatchSize());
            BytesMessage bytesMessage = null;
            do {
                BytesMessage receiveNoWait = this.subscriber.receiveNoWait();
                if (receiveNoWait != null && (receiveNoWait instanceof BytesMessage)) {
                    bytesMessage = receiveNoWait;
                    BytesMessage bytesMessage2 = receiveNoWait;
                    try {
                        byte[] bArr = new byte[bytesMessage2.readInt()];
                        bytesMessage2.readBytes(bArr);
                        Object deserialize = getSerializer().deserialize(new Binary(bArr).getBufferInput());
                        if (deserialize != null && (deserialize instanceof Event)) {
                            arrayList.add((Event) deserialize);
                        }
                    } catch (IOException e) {
                        logger.log(Level.SEVERE, "Failed to retrieve an event from distributor {0} channel {1}.Due to a serialization failure {2}", new Object[]{getEventDistributorName(), getEventChannelControllerName(), e});
                        e.printStackTrace();
                    }
                }
                if (receiveNoWait == null) {
                    break;
                }
            } while (arrayList.size() < this.controllerDependencies.getBatchSize());
            return new Pair<>(arrayList, bytesMessage);
        } catch (JMSException e2) {
            throw new RuntimeException((Throwable) e2);
        }
    }

    /* renamed from: acknowledgeDistributedEvents, reason: avoid collision after fix types in other method */
    protected void acknowledgeDistributedEvents2(List<Event> list, Message message) {
        try {
            message.acknowledge();
        } catch (JMSException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void setBatchDistributionDelay(long j) {
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Changing Batch Distribution Delay from {0} ms to {1} ms", new Object[]{Long.valueOf(this.controllerDependencies.getBatchDistributionDelay()), Long.valueOf(j)});
        }
        super.setBatchDistributionDelay(j);
        CacheFactory.getCache("coherence.live.objects.distributed").invoke(this.distributorIdentifier.getSymbolicName() + ":" + this.controllerIdentifier.getSymbolicName(), new UpdaterProcessor("getDependencies.setBatchDistributionDelay", Long.valueOf(j)));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void setBatchSize(int i) {
        int i2 = i <= 0 ? 1 : i;
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Changing Batch Size from {0} to {1}", new Object[]{Integer.valueOf(this.controllerDependencies.getBatchSize()), Integer.valueOf(i2)});
        }
        super.setBatchSize(i2);
        CacheFactory.getCache(Subscription.CACHENAME);
        CacheFactory.getCache("coherence.live.objects.distributed").invoke(this.distributorIdentifier.getSymbolicName() + ":" + this.controllerIdentifier.getSymbolicName(), new UpdaterProcessor("getDependencies.setBatchSize", Long.valueOf(i2)));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelControllerMBean
    public void setRestartDelay(long j) {
        if (logger.isLoggable(Level.INFO)) {
            logger.log(Level.INFO, "Changing Restart Delay from {0} ms to {1} ms", new Object[]{Long.valueOf(this.controllerDependencies.getBatchDistributionDelay()), Long.valueOf(j)});
        }
        super.setRestartDelay(j);
        CacheFactory.getCache("coherence.live.objects.distributed").invoke(this.distributorIdentifier.getSymbolicName() + ":" + this.controllerIdentifier.getSymbolicName(), new UpdaterProcessor("getDependencies.setRestartDelay", Long.valueOf(j)));
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController, com.oracle.coherence.patterns.eventdistribution.EventChannelController
    public void setStartingMode(EventChannelController.Mode mode) {
        if (mode != getStartingMode()) {
            if (logger.isLoggable(Level.INFO)) {
                logger.log(Level.INFO, "Changing Starting Mode from {0} to {1}", new Object[]{this.controllerDependencies.getStartingMode(), mode});
            }
            super.setStartingMode(mode);
            CacheFactory.getCache("coherence.live.objects.distributed").invoke(this.distributorIdentifier.getSymbolicName() + ":" + this.controllerIdentifier.getSymbolicName(), new UpdaterProcessor("getDependencies.setStartingMode", mode));
        }
    }

    @Override // com.oracle.coherence.patterns.eventdistribution.distributors.AbstractEventChannelController
    protected /* bridge */ /* synthetic */ void acknowledgeDistributedEvents(List list, Message message) {
        acknowledgeDistributedEvents2((List<Event>) list, message);
    }
}
