package org.cyclades.nyxlet.servicebrokernyxlet.message.impl.activemq.consumer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.cyclades.engine.MetaTypeEnum;
import org.cyclades.nyxlet.servicebrokernyxlet.message.impl.activemq.ConnectionResource;
import org.cyclades.nyxlet.servicebrokernyxlet.message.impl.activemq.MessageUtils;

/* loaded from: input_file:WEB-INF/nyxlets/servicebroker.nyxlet:org/cyclades/nyxlet/servicebrokernyxlet/message/impl/activemq/consumer/ActiveMQMergingDefaultConsumer.class */
public class ActiveMQMergingDefaultConsumer extends TimerTask implements ActiveMQConsumer {
    private Timer timer;
    private ConnectionResource connectionResource;
    private Session session;
    private MessageConsumer consumer;
    private MessageProducer producer;
    private long timerDelayMills = 1000;
    private long timerPeriodMills = 10000;
    private long accumulationWaitMills = 10000;
    private int minMessages = 10;
    private MessageListAggregate messages = new MessageListAggregate();
    private int replyToMessageDeliveryMode = -1;

    public ActiveMQMergingDefaultConsumer(ConnectionResource connectionResource) throws IOException {
        this.connectionResource = connectionResource;
    }

    @Override // org.cyclades.nyxlet.servicebrokernyxlet.message.impl.activemq.consumer.ActiveMQConsumer
    public ActiveMQMergingDefaultConsumer init(Map<String, String> map) throws Exception {
        try {
            if (map.containsKey("timer_delay_mills")) {
                this.timerDelayMills = Long.parseLong(map.get("timer_delay_mills"));
            }
            if (map.containsKey("timer_period_mills")) {
                this.timerPeriodMills = Long.parseLong(map.get("timer_period_mills"));
            }
            if (map.containsKey("accumulation_wait_mills")) {
                this.accumulationWaitMills = Long.parseLong(map.get("accumulation_wait_mills"));
            }
            if (map.containsKey("min_messages")) {
                this.minMessages = Integer.parseInt(map.get("min_messages"));
            }
            if (map.containsKey("replyto_message_delivery_mode")) {
                this.replyToMessageDeliveryMode = Integer.parseInt(map.get("replyto_message_delivery_mode"));
            }
            this.session = this.connectionResource.getConnection().createSession(false, 2);
            this.consumer = this.session.createConsumer(this.session.createQueue(this.connectionResource.getQueueName()));
            this.producer = this.session.createProducer(null);
            if (this.replyToMessageDeliveryMode > -1) {
                this.producer.setDeliveryMode(this.replyToMessageDeliveryMode);
            }
            this.timer = new Timer();
            this.timer.schedule(this, this.timerDelayMills, this.timerPeriodMills);
            return this;
        } catch (Exception e) {
            throw new Exception("ActiveMQMergingDefaultConsumer.init: " + e);
        }
    }

    @Override // org.cyclades.nyxlet.servicebrokernyxlet.message.impl.activemq.consumer.ActiveMQConsumer
    public void destroy() throws Exception {
        try {
            this.session.close();
        } catch (Exception e) {
        }
        try {
            this.consumer.close();
        } catch (Exception e2) {
        }
        try {
            this.producer.close();
        } catch (Exception e3) {
        }
        try {
            this.timer.cancel();
        } catch (Exception e4) {
            throw new Exception("ActiveMQMergingDefaultConsumer.destroy: " + e4);
        }
    }

    @Override // java.util.TimerTask, java.lang.Runnable
    public void run() {
        try {
            accumulateMessages();
        } catch (Exception e) {
            this.connectionResource.getCallBackServiceInstance().logError("ActiveMQMergingDefaultConsumer.run: " + e, new Throwable[0]);
        }
    }

    private synchronized void accumulateMessages() throws Exception {
        byte[] bytes;
        this.connectionResource.getConnection();
        try {
            javax.jms.Message receive = this.consumer.receive(100L);
            while (receive != null) {
                if (receive instanceof BytesMessage) {
                    bytes = MessageUtils.readBytes((BytesMessage) receive);
                } else {
                    if (!(receive instanceof TextMessage)) {
                        throw new UnsupportedOperationException("Message type not supported: " + receive.getClass().getName());
                    }
                    bytes = ((TextMessage) receive).getText().getBytes();
                }
                this.messages.add(new Message(bytes, receive));
                if (accumulationExpired()) {
                    break;
                } else {
                    receive = this.consumer.receive(100L);
                }
            }
            if (accumulationExpired()) {
                byte[] processMessages = processMessages();
                ackMessages();
                releaseAlternateFormatMessages();
                if (this.connectionResource.hasResponseProcessor()) {
                    this.connectionResource.fireResponseProcessor(processMessages, null);
                }
            }
        } catch (Exception e) {
            throw new Exception("ActiveMQMergingDefaultConsumer.accumulateMessage: " + e);
        }
    }

    public synchronized void releaseAlternateFormatMessages() throws Exception {
        try {
            Iterator<Message> it = this.messages.getAlternateFormatMessages().iterator();
            while (it.hasNext()) {
                this.producer.send(this.session.createQueue(this.connectionResource.getQueueName()), it.next().jmsMessage);
            }
            this.messages.getAlternateFormatMessages().clear();
        } catch (Exception e) {
            throw new Exception("ActiveMQMergingDefaultConsumer.releaseAlternateFormatMessages: " + e);
        }
    }

    public synchronized byte[] processMessages() throws Exception {
        try {
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            ArrayList arrayList = new ArrayList();
            for (Message message : this.messages.getMessages()) {
                arrayList.add(new String(message.body, "UTF-8"));
                Destination jMSReplyTo = message.jmsMessage.getJMSReplyTo();
                if (jMSReplyTo != null) {
                    linkedHashMap.put(jMSReplyTo, 1);
                }
            }
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.connectionResource.getCallBackServiceInstance().processXSTROMAMessagePayloads(byteArrayOutputStream, arrayList, this.messages.messageStartsWith.charValue() == '{' ? MetaTypeEnum.JSON : MetaTypeEnum.XML);
            byte[] byteArray = byteArrayOutputStream.toByteArray();
            if (linkedHashMap.size() > 0) {
                for (Map.Entry entry : linkedHashMap.entrySet()) {
                    BytesMessage createBytesMessage = this.session.createBytesMessage();
                    createBytesMessage.writeBytes(byteArray);
                    this.producer.send((Destination) entry.getKey(), createBytesMessage);
                }
            }
            return byteArray;
        } catch (Exception e) {
            throw new Exception("ActiveMQMergingDefaultConsumer.processMessage: " + e);
        }
    }

    public synchronized void ackMessages() throws Exception {
        try {
            Iterator<Message> it = this.messages.getMessages().iterator();
            while (it.hasNext()) {
                it.next().jmsMessage.acknowledge();
            }
            this.messages.reset();
        } catch (Exception e) {
            throw new Exception("ActiveMQMergingDefaultConsumer.processMessage: " + e);
        }
    }

    private synchronized boolean accumulationExpired() {
        return (this.messages.getMessages().size() > 0 && this.messages.getMessages().get(0).timeStamp + this.accumulationWaitMills < System.currentTimeMillis()) || this.messages.getMessages().size() >= this.minMessages;
    }

    @Override // org.cyclades.nyxlet.servicebrokernyxlet.message.impl.activemq.consumer.ActiveMQConsumer
    public /* bridge */ /* synthetic */ ActiveMQConsumer init(Map map) throws Exception {
        return init((Map<String, String>) map);
    }
}
