package org.apache.storm.jms.trident;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.storm.generated.StreamInfo;
import org.apache.storm.jms.JmsProvider;
import org.apache.storm.jms.JmsTupleProducer;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.OutputFieldsGetter;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.ITridentSpout;
import org.apache.storm.trident.topology.TransactionAttempt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:org/apache/storm/jms/trident/TridentJmsSpout.class */
public class TridentJmsSpout implements ITridentSpout<JmsBatch> {
    public static final String MAX_BATCH_SIZE_CONF = "topology.spout.max.batch.size";
    public static final int DEFAULT_BATCH_SIZE = 1000;
    private static final long serialVersionUID = -3469351154693356655L;
    private static int nameIndex = 1;
    private JmsTupleProducer tupleProducer;
    private JmsProvider jmsProvider;
    private int jmsAcknowledgeMode;
    private String name;

    /* loaded from: input_file:org/apache/storm/jms/trident/TridentJmsSpout$JmsBatchCoordinator.class */
    private class JmsBatchCoordinator implements ITridentSpout.BatchCoordinator<JmsBatch> {
        private final String name;
        private final Logger log = LoggerFactory.getLogger(JmsBatchCoordinator.class);

        JmsBatchCoordinator(String str) {
            this.name = str;
            this.log.info("Created batch coordinator for " + str);
        }

        public JmsBatch initializeTransaction(long j, JmsBatch jmsBatch, JmsBatch jmsBatch2) {
            this.log.debug("Initialise transaction " + j + " for " + this.name);
            return null;
        }

        public void success(long j) {
        }

        public boolean isReady(long j) {
            return true;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/storm/jms/trident/TridentJmsSpout$JmsEmitter.class */
    private class JmsEmitter implements ITridentSpout.Emitter<JmsBatch>, MessageListener {
        private final LinkedBlockingQueue<Message> queue;
        private final Connection connection;
        private final Session session;
        private final RotatingMap<Long, List<Message>> batchMessageMap;
        private final long rotateTimeMillis;
        private final int maxBatchSize;
        private final String name;
        private final Logger log = LoggerFactory.getLogger(JmsEmitter.class);
        private long lastRotate;

        JmsEmitter(String str, JmsProvider jmsProvider, JmsTupleProducer jmsTupleProducer, int i, Map<String, Object> map) {
            if (jmsProvider == null) {
                throw new IllegalStateException("JMS provider has not been set.");
            }
            if (jmsTupleProducer == null) {
                throw new IllegalStateException("JMS Tuple Producer has not been set.");
            }
            this.queue = new LinkedBlockingQueue<>();
            this.name = str;
            this.batchMessageMap = new RotatingMap<>(3);
            this.rotateTimeMillis = 1000 * ((Number) map.get("topology.message.timeout.secs")).intValue();
            this.lastRotate = System.currentTimeMillis();
            Number number = (Number) map.get(TridentJmsSpout.MAX_BATCH_SIZE_CONF);
            this.maxBatchSize = number != null ? number.intValue() : 1000;
            try {
                ConnectionFactory connectionFactory = jmsProvider.connectionFactory();
                Destination destination = jmsProvider.destination();
                this.connection = connectionFactory.createConnection();
                this.session = this.connection.createSession(false, i);
                this.session.createConsumer(destination).setMessageListener(this);
                this.connection.start();
                this.log.info("Created JmsEmitter with max batch size " + this.maxBatchSize + " rotate time " + this.rotateTimeMillis + "ms and destination " + destination + " for " + str);
            } catch (Exception e) {
                this.log.warn("Error creating JMS connection.", e);
                throw new IllegalStateException("Could not create JMS connection for spout ", e);
            }
        }

        public void success(TransactionAttempt transactionAttempt) {
            List<Message> list = (List) this.batchMessageMap.remove(transactionAttempt.getTransactionId());
            if (list == null) {
                this.log.warn("No messages found in batch with transaction id " + transactionAttempt.getTransactionId() + "/" + transactionAttempt.getAttemptId());
                return;
            }
            if (!list.isEmpty()) {
                this.log.debug("Success for batch with transaction id " + transactionAttempt.getTransactionId() + "/" + transactionAttempt.getAttemptId() + " for " + this.name);
            }
            for (Message message : list) {
                String str = "UnknownId";
                try {
                    str = message.getJMSMessageID();
                    message.acknowledge();
                    this.log.trace("Acknowledged message " + str);
                } catch (JMSException e) {
                    this.log.warn("Failed to acknowledge message " + str, e);
                }
            }
        }

        private void fail(Long l, List<Message> list) {
            this.log.debug("Failure for batch with transaction id " + l + " for " + this.name);
            if (list == null) {
                this.log.warn("Failed batch has no messages with transaction id " + l);
                return;
            }
            Iterator<Message> it = list.iterator();
            while (it.hasNext()) {
                try {
                    this.log.trace("Failed message " + it.next().getJMSMessageID());
                } catch (JMSException e) {
                    this.log.warn("Could not identify failed message ", e);
                }
            }
        }

        public void close() {
            try {
                this.log.info("Closing JMS connection.");
                this.session.close();
                this.connection.close();
            } catch (JMSException e) {
                this.log.warn("Error closing JMS connection.", e);
            }
        }

        public void emitBatch(TransactionAttempt transactionAttempt, JmsBatch jmsBatch, TridentCollector tridentCollector) {
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastRotate > this.rotateTimeMillis) {
                Map rotate = this.batchMessageMap.rotate();
                for (Long l : rotate.keySet()) {
                    this.log.warn("TIMED OUT batch with transaction id " + l + " for " + this.name);
                    fail(l, (List) rotate.get(l));
                }
                this.lastRotate = currentTimeMillis;
            }
            if (this.batchMessageMap.containsKey(transactionAttempt.getTransactionId())) {
                this.log.warn("FAILED duplicate batch with transaction id " + transactionAttempt.getTransactionId() + "/" + transactionAttempt.getAttemptId() + " for " + this.name);
                fail(transactionAttempt.getTransactionId(), (List) this.batchMessageMap.get(transactionAttempt.getTransactionId()));
            }
            ArrayList arrayList = new ArrayList();
            int i = 0;
            while (true) {
                if (i >= this.maxBatchSize) {
                    break;
                }
                Message poll = this.queue.poll();
                if (poll == null) {
                    Utils.sleep(50L);
                    break;
                }
                try {
                    if (TridentJmsSpout.this.jmsAcknowledgeMode != 1) {
                        arrayList.add(poll);
                    }
                    tridentCollector.emit(TridentJmsSpout.this.tupleProducer.toTuple(poll));
                } catch (JMSException e) {
                    this.log.warn("Failed to emit message, could not retrieve data for " + this.name + ": " + e);
                }
                i++;
            }
            if (arrayList.isEmpty()) {
                this.log.trace("No items to acknowledge for batch with transaction id " + transactionAttempt.getTransactionId() + "/" + transactionAttempt.getAttemptId() + " for " + this.name);
            } else {
                this.log.debug("Emitting batch with transaction id " + transactionAttempt.getTransactionId() + "/" + transactionAttempt.getAttemptId() + " and size " + arrayList.size() + " for " + this.name);
            }
            this.batchMessageMap.put(transactionAttempt.getTransactionId(), arrayList);
        }

        @Override // javax.jms.MessageListener
        public void onMessage(Message message) {
            try {
                this.log.trace("Queuing msg [" + message.getJMSMessageID() + PropertyAccessor.PROPERTY_KEY_SUFFIX);
            } catch (JMSException e) {
            }
            this.queue.offer(message);
        }
    }

    public TridentJmsSpout() {
        StringBuilder append = new StringBuilder().append("JmsSpout_");
        int i = nameIndex;
        nameIndex = i + 1;
        this.name = append.append(i).toString();
        this.jmsAcknowledgeMode = 1;
    }

    private static String toDeliveryModeString(int i) {
        switch (i) {
            case 1:
                return "AUTO_ACKNOWLEDGE";
            case 2:
                return "CLIENT_ACKNOWLEDGE";
            case 3:
                return "DUPS_OK_ACKNOWLEDGE";
            default:
                throw new IllegalArgumentException("Unknown JMS Acknowledge mode " + i + " (See javax.jms.Session for valid values)");
        }
    }

    public TridentJmsSpout named(String str) {
        this.name = str;
        return this;
    }

    public TridentJmsSpout withJmsProvider(JmsProvider jmsProvider) {
        this.jmsProvider = jmsProvider;
        return this;
    }

    public TridentJmsSpout withTupleProducer(JmsTupleProducer jmsTupleProducer) {
        this.tupleProducer = jmsTupleProducer;
        return this;
    }

    public TridentJmsSpout withJmsAcknowledgeMode(int i) {
        toDeliveryModeString(i);
        this.jmsAcknowledgeMode = i;
        return this;
    }

    public ITridentSpout.BatchCoordinator<JmsBatch> getCoordinator(String str, Map<String, Object> map, TopologyContext topologyContext) {
        return new JmsBatchCoordinator(this.name);
    }

    public ITridentSpout.Emitter<JmsBatch> getEmitter(String str, Map<String, Object> map, TopologyContext topologyContext) {
        return new JmsEmitter(this.name, this.jmsProvider, this.tupleProducer, this.jmsAcknowledgeMode, map);
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    public Fields getOutputFields() {
        OutputFieldsDeclarer outputFieldsGetter = new OutputFieldsGetter();
        this.tupleProducer.declareOutputFields(outputFieldsGetter);
        StreamInfo streamInfo = (StreamInfo) outputFieldsGetter.getFieldsDeclaration().get("default");
        if (streamInfo == null) {
            throw new IllegalArgumentException("Jms Tuple producer has not declared output fields for the default stream");
        }
        return new Fields(streamInfo.get_output_fields());
    }
}
