package io.elastic.sailor.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import io.elastic.api.Function;
import io.elastic.sailor.ContainerContext;
import io.elastic.sailor.ExecutionContext;
import io.elastic.sailor.ExecutionStats;
import io.elastic.sailor.MessageProcessor;
import io.elastic.sailor.Step;
import io.elastic.sailor.Utils;
import java.io.IOException;
import java.nio.charset.Charset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:io/elastic/sailor/impl/MessageConsumer.class */
public class MessageConsumer extends DefaultConsumer {
    private static final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
    private final CryptoServiceImpl cipher;
    private final MessageProcessor processor;
    private final Function function;
    private final Step step;
    private final ContainerContext containerContext;

    public MessageConsumer(Channel channel, CryptoServiceImpl cryptoServiceImpl, MessageProcessor messageProcessor, Function function, Step step, ContainerContext containerContext) {
        super(channel);
        this.cipher = cryptoServiceImpl;
        this.processor = messageProcessor;
        this.function = function;
        this.step = step;
        this.containerContext = containerContext;
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        long deliveryTag = envelope.getDeliveryTag();
        logger.info("Consumer {} received message: deliveryTag={}", str, Long.valueOf(deliveryTag));
        putIntoMDC(basicProperties);
        try {
            ExecutionStats executionStats = null;
            try {
                try {
                    executionStats = this.processor.processMessage(createExecutionContext(bArr, basicProperties), this.function);
                    removeFromMDC("threadId");
                    removeFromMDC("messageId");
                    removeFromMDC("parentMessageId");
                    ackOrReject(executionStats, deliveryTag);
                } catch (Throwable th) {
                    removeFromMDC("threadId");
                    removeFromMDC("messageId");
                    removeFromMDC("parentMessageId");
                    ackOrReject(executionStats, deliveryTag);
                    throw th;
                }
            } catch (Exception e) {
                logger.error("Failed to process message for delivery tag:" + deliveryTag, e);
                removeFromMDC("threadId");
                removeFromMDC("messageId");
                removeFromMDC("parentMessageId");
                ackOrReject(executionStats, deliveryTag);
            }
        } catch (Exception e2) {
            logger.info("Failed to parse message to process {}", Long.valueOf(deliveryTag), e2);
            getChannel().basicReject(deliveryTag, false);
        }
    }

    private void putIntoMDC(AMQP.BasicProperties basicProperties) {
        String threadId = Utils.getThreadId(basicProperties);
        Object headerValue = getHeaderValue(basicProperties, "messageId");
        Object headerValue2 = getHeaderValue(basicProperties, "parentMessageId");
        MDC.put("threadId", threadId);
        MDC.put("messageId", headerValue.toString());
        MDC.put("parentMessageId", headerValue2.toString());
        logger.info("messageId={}, parentMessageId={}, threadId={}", new Object[]{headerValue, headerValue2, threadId});
    }

    private static void removeFromMDC(String str) {
        try {
            MDC.remove(str);
        } catch (Exception e) {
            logger.warn("Failed to remove {} from MDC", str, e);
        }
    }

    private ExecutionContext createExecutionContext(byte[] bArr, AMQP.BasicProperties basicProperties) {
        return new ExecutionContext(this.step, Utils.createMessage(this.cipher.decryptMessageContent(new String(bArr, Charset.forName("UTF-8")))), basicProperties, this.containerContext);
    }

    private void ackOrReject(ExecutionStats executionStats, long j) throws IOException {
        logger.info("Execution stats: {}", executionStats);
        if (executionStats == null || executionStats.getErrorCount() > 0) {
            logger.info("Reject received messages {}", Long.valueOf(j));
            getChannel().basicReject(j, false);
        } else {
            logger.info("Acknowledging received messages {}", Long.valueOf(j));
            getChannel().basicAck(j, true);
        }
    }

    private Object getHeaderValue(AMQP.BasicProperties basicProperties, String str) {
        return basicProperties.getHeaders().getOrDefault(str, "unknown");
    }
}
