package org.apache.flink.streaming.connectors.activemq;

import java.util.HashMap;
import java.util.List;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.activemq.internal.AMQExceptionListener;
import org.apache.flink.streaming.connectors.activemq.internal.AMQUtil;
import org.apache.flink.streaming.connectors.activemq.internal.RunningChecker;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/activemq/AMQSource.class */
public class AMQSource<OUT> extends MessageAcknowledgingSourceBase<OUT, String> implements ResultTypeQueryable<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(AMQSource.class);
    private final ActiveMQConnectionFactory connectionFactory;
    private final String destinationName;
    private final DeserializationSchema<OUT> deserializationSchema;
    private final DestinationType destinationType;
    private boolean logFailuresOnly;
    private RunningChecker runningChecker;
    private transient Connection connection;
    private transient Session session;
    private transient MessageConsumer consumer;
    private boolean autoAck;
    private HashMap<String, Message> unacknowledgedMessages;
    private AMQExceptionListener exceptionListener;

    public AMQSource(AMQSourceConfig<OUT> aMQSourceConfig) {
        super(String.class);
        this.logFailuresOnly = false;
        this.unacknowledgedMessages = new HashMap<>();
        this.connectionFactory = aMQSourceConfig.getConnectionFactory();
        this.destinationName = aMQSourceConfig.getDestinationName();
        this.deserializationSchema = aMQSourceConfig.getDeserializationSchema();
        this.runningChecker = aMQSourceConfig.getRunningChecker();
        this.destinationType = aMQSourceConfig.getDestinationType();
    }

    public void setLogFailuresOnly(boolean z) {
        this.logFailuresOnly = z;
    }

    void setExceptionListener(AMQExceptionListener aMQExceptionListener) {
        this.exceptionListener = aMQExceptionListener;
    }

    public void open(Configuration configuration) throws Exception {
        int i;
        super.open(configuration);
        this.connection = this.connectionFactory.createConnection();
        this.connection.start();
        this.exceptionListener = new AMQExceptionListener(LOG, this.logFailuresOnly);
        this.connection.setExceptionListener(this.exceptionListener);
        StreamingRuntimeContext runtimeContext = getRuntimeContext();
        if ((runtimeContext instanceof StreamingRuntimeContext) && runtimeContext.isCheckpointingEnabled()) {
            this.autoAck = false;
            i = 4;
        } else {
            this.autoAck = true;
            i = 1;
        }
        this.session = this.connection.createSession(false, i);
        this.consumer = this.session.createConsumer(AMQUtil.getDestination(this.session, this.destinationType, this.destinationName));
        this.runningChecker.setIsRunning(true);
    }

    public void close() throws Exception {
        super.close();
        RuntimeException runtimeException = null;
        try {
            this.consumer.close();
        } catch (JMSException e) {
            if (this.logFailuresOnly) {
                LOG.error("Failed to close ActiveMQ session", e);
            } else {
                runtimeException = new RuntimeException("Failed to close ActiveMQ consumer", e);
            }
        }
        try {
            this.session.close();
        } catch (JMSException e2) {
            if (this.logFailuresOnly) {
                LOG.error("Failed to close ActiveMQ session", e2);
            } else {
                runtimeException = runtimeException == null ? new RuntimeException("Failed to close ActiveMQ session", e2) : runtimeException;
            }
        }
        try {
            this.connection.close();
        } catch (JMSException e3) {
            if (this.logFailuresOnly) {
                LOG.error("Failed to close ActiveMQ session", e3);
            } else {
                runtimeException = runtimeException == null ? new RuntimeException("Failed to close ActiveMQ connection", e3) : runtimeException;
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    protected void acknowledgeIDs(long j, List<String> list) {
        try {
            for (String str : list) {
                Message message = this.unacknowledgedMessages.get(str);
                if (message != null) {
                    message.acknowledge();
                    this.unacknowledgedMessages.remove(str);
                } else {
                    LOG.warn("Tried to acknowledge unknown ActiveMQ message id: {}", str);
                }
            }
        } catch (JMSException e) {
            if (!this.logFailuresOnly) {
                throw new RuntimeException("Failed to acknowledge ActiveMQ message");
            }
            LOG.error("Failed to acknowledge ActiveMQ message");
        }
    }

    public void run(SourceFunction.SourceContext<OUT> sourceContext) throws Exception {
        while (this.runningChecker.isRunning()) {
            this.exceptionListener.checkErroneous();
            Message receive = this.consumer.receive(1000L);
            if (!(receive instanceof BytesMessage)) {
                LOG.warn("Active MQ source received non bytes message: {}", receive);
                return;
            }
            Message message = (BytesMessage) receive;
            byte[] bArr = new byte[(int) message.getBodyLength()];
            message.readBytes(bArr);
            Object deserialize = this.deserializationSchema.deserialize(bArr);
            synchronized (sourceContext.getCheckpointLock()) {
                sourceContext.collect(deserialize);
                if (!this.autoAck) {
                    addId(message.getJMSMessageID());
                    this.unacknowledgedMessages.put(message.getJMSMessageID(), message);
                }
            }
        }
    }

    public void cancel() {
        this.runningChecker.setIsRunning(false);
    }

    public TypeInformation<OUT> getProducedType() {
        return this.deserializationSchema.getProducedType();
    }
}
