package org.apache.streampipes.messaging.jms;

import java.io.Serializable;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.grounding.JmsTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;

/* loaded from: input_file:org/apache/streampipes/messaging/jms/ActiveMQConsumer.class */
public class ActiveMQConsumer extends ActiveMQConnectionProvider implements EventConsumer<JmsTransportProtocol>, AutoCloseable, Serializable {
    private Session session;
    private MessageConsumer consumer;
    private InternalEventProcessor<byte[]> eventProcessor;
    private Boolean connected = false;

    private void initListener() {
        try {
            this.consumer.setMessageListener(message -> {
                if (message instanceof BytesMessage) {
                    this.eventProcessor.onEvent(((ActiveMQBytesMessage) message).getContent().getData());
                }
            });
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public void connect(JmsTransportProtocol jmsTransportProtocol, InternalEventProcessor<byte[]> internalEventProcessor) throws SpRuntimeException {
        String makeActiveMqUrl = ActiveMQUtils.makeActiveMqUrl(jmsTransportProtocol);
        try {
            this.eventProcessor = internalEventProcessor;
            this.session = startJmsConnection(makeActiveMqUrl).createSession(false, 1);
            this.consumer = this.session.createConsumer(this.session.createTopic(jmsTransportProtocol.getTopicDefinition().getActualTopicName()));
            initListener();
            this.connected = true;
        } catch (JMSException e) {
            throw new SpRuntimeException("could not connect to activemq broker");
        }
    }

    public void disconnect() throws SpRuntimeException {
        try {
            this.consumer.close();
            this.session.close();
            this.connected = false;
        } catch (JMSException e) {
            throw new SpRuntimeException("could not disconnect from activemq broker");
        }
    }

    public Boolean isConnected() {
        return this.connected;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        disconnect();
    }

    public /* bridge */ /* synthetic */ void connect(TransportProtocol transportProtocol, InternalEventProcessor internalEventProcessor) throws SpRuntimeException {
        connect((JmsTransportProtocol) transportProtocol, (InternalEventProcessor<byte[]>) internalEventProcessor);
    }
}
