package org.cyclades.nyxlet.servicebrokernyxlet.message.impl.rabbitmq.consumer;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.GetResponse;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import org.cyclades.engine.MetaTypeEnum;
import org.cyclades.nyxlet.servicebrokernyxlet.message.impl.rabbitmq.ConnectionResource;

/* loaded from: input_file:WEB-INF/nyxlets/servicebroker.nyxlet:org/cyclades/nyxlet/servicebrokernyxlet/message/impl/rabbitmq/consumer/RabbitMQMergingDefaultConsumer.class */
public class RabbitMQMergingDefaultConsumer extends TimerTask implements RabbitMQConsumer {
    private Timer timer;
    ConnectionResource connectionResource;
    private static final String MERGE_ON_REPLYTO = "merge_on_replyto";
    private static final String REPLYTO_UNITY_ONLY = "replyto_unity_only";
    private long timerDelayMills = 1000;
    private long timerPeriodMills = 10000;
    private long accumulationWaitMills = 10000;
    private int minMessages = 10;
    MessageListAggregate messages = new MessageListAggregate();
    private int replyToMessageDeliveryMode = -1;
    private boolean mergeOnReplyTo = true;
    private boolean replyToUnityOnly = false;

    public RabbitMQMergingDefaultConsumer(ConnectionResource connectionResource) throws IOException {
        this.connectionResource = connectionResource;
    }

    @Override // org.cyclades.nyxlet.servicebrokernyxlet.message.impl.rabbitmq.consumer.RabbitMQConsumer
    public RabbitMQMergingDefaultConsumer init(Map<String, String> map) throws Exception {
        try {
            if (map.containsKey("timer_delay_mills")) {
                this.timerDelayMills = Long.parseLong(map.get("timer_delay_mills"));
            }
            if (map.containsKey("timer_period_mills")) {
                this.timerPeriodMills = Long.parseLong(map.get("timer_period_mills"));
            }
            if (map.containsKey("accumulation_wait_mills")) {
                this.accumulationWaitMills = Long.parseLong(map.get("accumulation_wait_mills"));
            }
            if (map.containsKey("min_messages")) {
                this.minMessages = Integer.parseInt(map.get("min_messages"));
            }
            if (map.containsKey("replyto_message_delivery_mode")) {
                this.replyToMessageDeliveryMode = Integer.parseInt(map.get("replyto_message_delivery_mode"));
            }
            if (map.containsKey(MERGE_ON_REPLYTO) && map.get(MERGE_ON_REPLYTO) != null) {
                this.mergeOnReplyTo = map.get(MERGE_ON_REPLYTO).equalsIgnoreCase("true");
            }
            if (map.containsKey(REPLYTO_UNITY_ONLY) && map.get(REPLYTO_UNITY_ONLY) != null) {
                this.replyToUnityOnly = map.get(REPLYTO_UNITY_ONLY).equalsIgnoreCase("true");
            }
            this.timer = new Timer();
            this.timer.schedule(this, this.timerDelayMills, this.timerPeriodMills);
            return this;
        } catch (Exception e) {
            throw new Exception("RabbitMQMergingDefaultConsumer.init: " + e);
        }
    }

    @Override // org.cyclades.nyxlet.servicebrokernyxlet.message.impl.rabbitmq.consumer.RabbitMQConsumer
    public void destroy() throws Exception {
        try {
            this.timer.cancel();
        } catch (Exception e) {
            throw new Exception("RabbitMQMergingDefaultConsumer.destroy: " + e);
        }
    }

    @Override // java.util.TimerTask, java.lang.Runnable
    public void run() {
        try {
            accumulateMessages();
        } catch (Exception e) {
            this.connectionResource.getCallBackServiceInstance().logStackTrace(e);
        }
    }

    private synchronized void accumulateMessages() throws Exception {
        try {
            GetResponse basicGet = this.connectionResource.getChannel().basicGet(this.connectionResource.getQueueName(), false);
            while (basicGet != null) {
                this.messages.add(new Message(basicGet.getEnvelope(), basicGet.getProps(), basicGet.getBody()));
                if (accumulationExpired()) {
                    break;
                } else {
                    basicGet = this.connectionResource.getChannel().basicGet(this.connectionResource.getQueueName(), false);
                }
            }
            releaseAlternateFormatMessages();
            if (accumulationExpired()) {
                processMessages();
                ackMessages();
            }
        } catch (Exception e) {
            throw new Exception("RabbitMQMergingDefaultConsumer.accumulateMessage: " + e);
        }
    }

    public synchronized void releaseAlternateFormatMessages() throws Exception {
        try {
            Iterator<Message> it = this.messages.getAlternateFormatMessages().iterator();
            while (it.hasNext()) {
                this.connectionResource.getChannel().basicReject(it.next().envelope.getDeliveryTag(), true);
            }
            this.messages.getAlternateFormatMessages().clear();
        } catch (Exception e) {
            throw new Exception("RabbitMQMergingDefaultConsumer.releaseAlternateFormatMessages: " + e);
        }
    }

    public synchronized void processMessages() throws Exception {
        try {
            if (this.mergeOnReplyTo) {
                Iterator<Map.Entry<String, List<Message>>> it = this.messages.getMessagesByReplyTo().entrySet().iterator();
                while (it.hasNext()) {
                    processMessageBatch(it.next().getValue());
                }
            } else {
                processMessageBatch(this.messages.getMessages());
            }
        } catch (Exception e) {
            throw new Exception("RabbitMQMergingDefaultConsumer.processMessages: " + e);
        }
    }

    private synchronized void processMessageBatch(List<Message> list) throws Exception {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        ArrayList arrayList = new ArrayList();
        for (Message message : list) {
            arrayList.add(new String(message.body));
            String replyTo = message.properties.getReplyTo();
            if (replyTo != null && !replyTo.isEmpty()) {
                linkedHashMap.put(replyTo, 1);
            }
        }
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        this.connectionResource.getCallBackServiceInstance().processXSTROMAMessagePayloads(byteArrayOutputStream, arrayList, this.messages.messageStartsWith.charValue() == '{' ? MetaTypeEnum.JSON : MetaTypeEnum.XML);
        byte[] byteArray = byteArrayOutputStream.toByteArray();
        if (linkedHashMap.size() > 0 && (!this.replyToUnityOnly || (this.replyToUnityOnly && list.size() == 1))) {
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            if (this.replyToMessageDeliveryMode > -1) {
                builder.deliveryMode(Integer.valueOf(this.replyToMessageDeliveryMode));
            }
            AMQP.BasicProperties build = builder.build();
            Iterator it = linkedHashMap.entrySet().iterator();
            while (it.hasNext()) {
                this.connectionResource.getChannel().basicPublish("", (String) ((Map.Entry) it.next()).getKey(), build, byteArray);
            }
        }
        try {
            if (this.connectionResource.hasResponseProcessor()) {
                this.connectionResource.fireResponseProcessor(byteArray, !list.isEmpty() ? list.get(0).body : new byte[0]);
            }
        } catch (Exception e) {
            this.connectionResource.getCallBackServiceInstance().logStackTrace(e);
        }
    }

    public synchronized void ackMessages() throws Exception {
        try {
            Iterator<Message> it = this.messages.getMessages().iterator();
            while (it.hasNext()) {
                this.connectionResource.getChannel().basicAck(it.next().envelope.getDeliveryTag(), false);
            }
            this.messages.reset();
        } catch (Exception e) {
            throw new Exception("RabbitMQMergingDefaultConsumer.ackMessages: " + e);
        }
    }

    private synchronized boolean accumulationExpired() {
        return (this.messages.getMessages().size() > 0 && this.messages.getMessages().get(0).timeStamp + this.accumulationWaitMills < System.currentTimeMillis()) || this.messages.getMessages().size() >= this.minMessages;
    }

    @Override // org.cyclades.nyxlet.servicebrokernyxlet.message.impl.rabbitmq.consumer.RabbitMQConsumer
    public /* bridge */ /* synthetic */ RabbitMQConsumer init(Map map) throws Exception {
        return init((Map<String, String>) map);
    }
}
