package com.hpe.caf.worker.batch;

import com.hpe.caf.api.Codec;
import com.hpe.caf.api.CodecException;
import com.hpe.caf.api.worker.TaskMessage;
import com.hpe.caf.util.rabbitmq.RabbitUtil;
import com.hpe.caf.worker.batch.QueueConsumer;
import com.hpe.caf.worker.queue.rabbit.RabbitWorkerQueueConfiguration;
import com.hpe.caf.worker.testing.SettingsProvider;
import com.hpe.caf.worker.testing.WorkerServices;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/hpe/caf/worker/batch/BatchTargetQueueRetriever.class */
public class BatchTargetQueueRetriever {
    private static BatchTargetQueueRetriever instance;
    private static Connection connection;
    private Channel channel;
    private Codec codec;
    private static Logger logger = Logger.getLogger(BatchTargetQueueRetriever.class);
    private boolean debugEnabled;

    protected BatchTargetQueueRetriever() throws Exception {
        createRabbitConnection((RabbitWorkerQueueConfiguration) WorkerServices.getDefault().getConfigurationSource().getConfiguration(RabbitWorkerQueueConfiguration.class));
        this.codec = WorkerServices.getDefault().getCodec();
        this.debugEnabled = SettingsProvider.defaultProvider.getBooleanSetting("create.debug.message", false);
    }

    public static BatchTargetQueueRetriever getInstance() throws Exception {
        if (instance == null) {
            instance = new BatchTargetQueueRetriever();
        }
        return instance;
    }

    public List<TaskMessage> retrieveMessages(String str) throws IOException, InterruptedException, CodecException {
        if (this.channel == null) {
            this.channel = connection.createChannel();
            this.channel.queueDeclare(str, true, false, false, new HashMap());
        }
        ArrayList arrayList = new ArrayList();
        QueueConsumer queueConsumer = new QueueConsumer(this.channel);
        for (int i = 0; i < 5; i++) {
            try {
                this.channel.basicConsume(str, false, queueConsumer);
                break;
            } catch (IOException e) {
                logger.error(e.getMessage());
                if (i >= 5) {
                    throw e;
                }
                Thread.sleep(1000L);
            }
        }
        QueueConsumer.Delivery nextDelivery = queueConsumer.nextDelivery(100L);
        while (true) {
            QueueConsumer.Delivery delivery = nextDelivery;
            if (delivery == null) {
                break;
            }
            arrayList.add(this.codec.deserialise(delivery.getBody(), TaskMessage.class));
            this.channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            nextDelivery = queueConsumer.nextDelivery(100L);
        }
        if (this.debugEnabled) {
            publishDebugMessages(arrayList, str);
        }
        purgeQueue(str);
        return arrayList;
    }

    private void createRabbitConnection(RabbitWorkerQueueConfiguration rabbitWorkerQueueConfiguration) throws IOException, TimeoutException {
        String setting = SettingsProvider.defaultProvider.getSetting("docker.host.address");
        Integer valueOf = Integer.valueOf(Integer.parseInt(SettingsProvider.defaultProvider.getSetting("rabbitmq.node.port")));
        rabbitWorkerQueueConfiguration.getRabbitConfiguration().setRabbitHost(setting != null ? setting : rabbitWorkerQueueConfiguration.getRabbitConfiguration().getRabbitHost());
        rabbitWorkerQueueConfiguration.getRabbitConfiguration().setRabbitPort(valueOf != null ? valueOf.intValue() : rabbitWorkerQueueConfiguration.getRabbitConfiguration().getRabbitPort());
        connection = RabbitUtil.createRabbitConnection(rabbitWorkerQueueConfiguration.getRabbitConfiguration());
    }

    private void purgeQueue(String str) {
        try {
            this.channel.queuePurge(str);
            this.channel.close();
            this.channel = null;
        } catch (IOException e) {
            logger.error("Failed to delete queue " + str, e);
        } catch (TimeoutException e2) {
            logger.error("Failed to close channel", e2);
        }
    }

    private void publishDebugMessages(List<TaskMessage> list, String str) throws IOException, CodecException {
        String str2 = str + "-debug";
        RabbitUtil.declareWorkerQueue(this.channel, str2);
        Iterator<TaskMessage> it = list.iterator();
        while (it.hasNext()) {
            this.channel.basicPublish("", str2, MessageProperties.PERSISTENT_TEXT_PLAIN, this.codec.serialise(it.next()));
        }
    }
}
