package io.camunda.connector.rabbitmq.inbound;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ValueNode;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.CorrelationFailureHandlingStrategy;
import io.camunda.connector.api.inbound.CorrelationResult;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.rabbitmq.inbound.model.RabbitMqInboundResult;
import io.camunda.connector.rabbitmq.supplier.ObjectMapperSupplier;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.SwitchBootstraps;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import org.apache.commons.text.StringEscapeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/connector/rabbitmq/inbound/RabbitMqConsumer.class */
public class RabbitMqConsumer extends DefaultConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMqConsumer.class);
    private final InboundConnectorContext context;

    public RabbitMqConsumer(Channel channel, InboundConnectorContext inboundConnectorContext) {
        super(channel);
        this.context = inboundConnectorContext;
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        LOGGER.debug("Received AMQP message with delivery tag {}", Long.valueOf(envelope.getDeliveryTag()));
        this.context.log(Activity.level(Severity.INFO).tag("Message").message("Received AMQP message with delivery tag " + envelope.getDeliveryTag()));
        try {
            handleCorrelationResult(envelope, this.context.correlateWithResult(prepareVariables(str, basicProperties, bArr)));
        } catch (Exception e) {
            LOGGER.debug("NACK (requeue) - unhandled exception", e);
            this.context.log(Activity.level(Severity.WARNING).tag("Message").message("NACK (requeue) - failed to correlate event"));
            getChannel().basicReject(envelope.getDeliveryTag(), true);
        }
    }

    private void handleCorrelationResult(Envelope envelope, CorrelationResult correlationResult) throws IOException {
        Objects.requireNonNull(correlationResult);
        switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Object.class, Integer.TYPE), CorrelationResult.Success.class, CorrelationResult.Failure.class).dynamicInvoker().invoke(correlationResult, 0) /* invoke-custom */) {
            case 0:
                LOGGER.debug("ACK - message correlated successfully");
                getChannel().basicAck(envelope.getDeliveryTag(), false);
                return;
            case 1:
                CorrelationResult.Failure failure = (CorrelationResult.Failure) correlationResult;
                InboundConnectorContext inboundConnectorContext = this.context;
                Activity.Builder tag = Activity.level(Severity.WARNING).tag("Message");
                long deliveryTag = envelope.getDeliveryTag();
                failure.message();
                inboundConnectorContext.log(tag.message("Failed to handle AMQP message with delivery tag " + deliveryTag + ", reason: " + inboundConnectorContext));
                CorrelationFailureHandlingStrategy.ForwardErrorToUpstream handlingStrategy = failure.handlingStrategy();
                Objects.requireNonNull(handlingStrategy);
                switch ((int) SwitchBootstraps.typeSwitch(MethodHandles.lookup(), "typeSwitch", MethodType.methodType(Integer.TYPE, Object.class, Integer.TYPE), CorrelationFailureHandlingStrategy.ForwardErrorToUpstream.class, CorrelationFailureHandlingStrategy.Ignore.class).dynamicInvoker().invoke(handlingStrategy, 0) /* invoke-custom */) {
                    case 0:
                        if (handlingStrategy.isRetryable()) {
                            LOGGER.debug("NACK (requeue) - message not correlated");
                            getChannel().basicReject(envelope.getDeliveryTag(), true);
                            return;
                        } else {
                            LOGGER.debug("NACK (drop) - message not correlated");
                            getChannel().basicReject(envelope.getDeliveryTag(), false);
                            return;
                        }
                    case 1:
                        LOGGER.debug("ACK - message ignored");
                        getChannel().basicAck(envelope.getDeliveryTag(), false);
                        return;
                    default:
                        throw new MatchException((String) null, (Throwable) null);
                }
            default:
                throw new MatchException((String) null, (Throwable) null);
        }
    }

    public void handleCancel(String str) {
        LOGGER.info("Consumer cancelled: {}", str);
        try {
            this.context.log(Activity.level(Severity.INFO).tag("Subscription").message("Consumer cancelled: " + str));
            this.context.cancel((Throwable) null);
        } catch (Exception e) {
            this.context.reportHealth(Health.down(e));
            LOGGER.error("Failed to cancel Connector execution: {}", e.getMessage());
        }
    }

    public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        LOGGER.error("Consumer shutdown: {}", str, shutdownSignalException);
        this.context.log(Activity.level(Severity.INFO).tag("Subscription").message("Consumer shutdown: " + str + String.valueOf(shutdownSignalException)));
        try {
            this.context.cancel(shutdownSignalException);
        } catch (Exception e) {
            LOGGER.error("Failed to cancel Connector execution: {}", e.getMessage());
        }
    }

    private RabbitMqInboundResult prepareVariables(String str, AMQP.BasicProperties basicProperties, byte[] bArr) {
        Object obj;
        try {
            String str2 = new String(bArr, StandardCharsets.UTF_8);
            if (isPayloadJson(str2)) {
                ValueNode valueNode = (JsonNode) ObjectMapperSupplier.instance().readValue(StringEscapeUtils.unescapeJson(str2), JsonNode.class);
                if (valueNode instanceof ValueNode) {
                    ValueNode valueNode2 = valueNode;
                    obj = valueNode2.isBoolean() ? Boolean.valueOf(valueNode2.asBoolean()) : valueNode2.isNumber() ? Double.valueOf(valueNode2.asDouble()) : valueNode2.asText();
                } else {
                    obj = ObjectMapperSupplier.instance().convertValue(valueNode, Object.class);
                }
            } else {
                obj = str2;
            }
            return new RabbitMqInboundResult(new RabbitMqInboundResult.RabbitMqInboundMessage(str, obj, AMQPPropertyUtil.toProperties(basicProperties)));
        } catch (Exception e) {
            LOGGER.error("Failed to parse AMQP message body: {}", e.getMessage());
            throw new ConnectorInputException(e);
        }
    }

    private boolean isPayloadJson(String str) {
        try {
            ObjectMapperSupplier.instance().readValue(StringEscapeUtils.unescapeJson(str), JsonNode.class);
            return true;
        } catch (JsonProcessingException e) {
            return false;
        }
    }
}
