package org.springframework.integration.amqp.inbound;

import com.rabbitmq.client.Channel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.EndpointUtils;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.messaging.MessageChannel;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-amqp-5.3.8.RELEASE.jar:org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter.class */
public class AmqpInboundChannelAdapter extends MessageProducerSupport implements OrderlyShutdownCapable {
    private static final ThreadLocal<AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal<>();
    private final AbstractMessageListenerContainer messageListenerContainer;
    private RetryTemplate retryTemplate;
    private RecoveryCallback<?> recoveryCallback;
    private boolean bindSourceMessage;
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper();
    private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0);
    private BatchMode batchMode = BatchMode.MESSAGES;

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-amqp-5.3.8.RELEASE.jar:org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter$BatchListener.class */
    protected class BatchListener extends Listener implements ChannelAwareBatchMessageListener {
        private final boolean batchModeMessages;

        protected BatchListener() {
            super();
            this.batchModeMessages = BatchMode.MESSAGES.equals(AmqpInboundChannelAdapter.this.batchMode);
        }

        @Override // org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener, org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener
        public void onMessageBatch(List<Message> list, Channel channel) {
            List<org.springframework.messaging.Message<?>> convertMessages = this.batchModeMessages ? convertMessages(list, channel) : convertPayloads(list, channel);
            if (convertMessages != null) {
                org.springframework.messaging.Message<Object> createMessageFromPayload = createMessageFromPayload(convertMessages, channel, new HashMap(), list.get(list.size() - 1).getMessageProperties().getDeliveryTag());
                try {
                    if (this.retryOps == null) {
                        AmqpInboundChannelAdapter.this.setAttributesIfNecessary(list, createMessageFromPayload);
                        AmqpInboundChannelAdapter.this.sendMessage(createMessageFromPayload);
                    } else {
                        this.retryOps.execute(retryContext -> {
                            StaticMessageHeaderAccessor.getDeliveryAttempt(createMessageFromPayload).incrementAndGet();
                            if (this.batchModeMessages) {
                                ((List) createMessageFromPayload.getPayload()).forEach(message -> {
                                    StaticMessageHeaderAccessor.getDeliveryAttempt(message).incrementAndGet();
                                });
                            }
                            AmqpInboundChannelAdapter.this.setAttributesIfNecessary(list, createMessageFromPayload);
                            AmqpInboundChannelAdapter.this.sendMessage(createMessageFromPayload);
                            return null;
                        }, this.recoverer);
                    }
                } finally {
                    if (this.retryOps == null) {
                        AmqpInboundChannelAdapter.ATTRIBUTES_HOLDER.remove();
                    }
                }
            }
        }

        private List<org.springframework.messaging.Message<?>> convertMessages(List<Message> list, Channel channel) {
            ArrayList arrayList = new ArrayList();
            try {
                list.forEach(message -> {
                    arrayList.add(createMessageFromAmqp(message, channel));
                });
                return arrayList;
            } catch (MessageConversionException e) {
                MessageChannel errorChannel = AmqpInboundChannelAdapter.this.getErrorChannel();
                if (errorChannel == null) {
                    throw e;
                }
                AmqpInboundChannelAdapter.this.setAttributesIfNecessary(list, null);
                AmqpInboundChannelAdapter.this.getMessagingTemplate().send((MessagingTemplate) errorChannel, (org.springframework.messaging.Message<?>) AmqpInboundChannelAdapter.this.buildErrorMessage(null, EndpointUtils.errorMessagePayload(list, channel, this.manualAcks, e)));
                return null;
            }
        }

        private List<?> convertPayloads(List<Message> list, Channel channel) {
            ArrayList arrayList = new ArrayList();
            try {
                list.forEach(message -> {
                    arrayList.add(this.converter.fromMessage(message));
                });
                return arrayList;
            } catch (MessageConversionException e) {
                MessageChannel errorChannel = AmqpInboundChannelAdapter.this.getErrorChannel();
                if (errorChannel == null) {
                    throw e;
                }
                AmqpInboundChannelAdapter.this.setAttributesIfNecessary(list, null);
                AmqpInboundChannelAdapter.this.getMessagingTemplate().send((MessagingTemplate) errorChannel, (org.springframework.messaging.Message<?>) AmqpInboundChannelAdapter.this.buildErrorMessage(null, EndpointUtils.errorMessagePayload(list, channel, this.manualAcks, e)));
                return null;
            }
        }
    }

    /* loaded from: input_file:BOOT-INF/lib/spring-integration-amqp-5.3.8.RELEASE.jar:org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter$BatchMode.class */
    public enum BatchMode {
        MESSAGES,
        EXTRACT_PAYLOADS
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:BOOT-INF/lib/spring-integration-amqp-5.3.8.RELEASE.jar:org/springframework/integration/amqp/inbound/AmqpInboundChannelAdapter$Listener.class */
    public class Listener implements ChannelAwareMessageListener {
        protected final MessageConverter converter;
        protected final boolean manualAcks;
        protected final RetryOperations retryOps;
        protected final RecoveryCallback<?> recoverer;

        protected Listener() {
            this.converter = AmqpInboundChannelAdapter.this.messageConverter;
            this.manualAcks = AcknowledgeMode.MANUAL == AmqpInboundChannelAdapter.this.messageListenerContainer.getAcknowledgeMode();
            this.retryOps = AmqpInboundChannelAdapter.this.retryTemplate;
            this.recoverer = AmqpInboundChannelAdapter.this.recoveryCallback;
        }

        @Override // org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener
        public void onMessage(Message message, Channel channel) {
            try {
                try {
                    if (this.retryOps == null) {
                        createAndSend(message, channel);
                    } else {
                        org.springframework.messaging.Message<Object> createMessageFromAmqp = createMessageFromAmqp(message, channel);
                        this.retryOps.execute(retryContext -> {
                            StaticMessageHeaderAccessor.getDeliveryAttempt(createMessageFromAmqp).incrementAndGet();
                            AmqpInboundChannelAdapter.this.setAttributesIfNecessary(message, createMessageFromAmqp);
                            AmqpInboundChannelAdapter.this.sendMessage(createMessageFromAmqp);
                            return null;
                        }, this.recoverer);
                    }
                    if (this.retryOps == null) {
                        AmqpInboundChannelAdapter.ATTRIBUTES_HOLDER.remove();
                    }
                } catch (MessageConversionException e) {
                    MessageChannel errorChannel = AmqpInboundChannelAdapter.this.getErrorChannel();
                    if (errorChannel == null) {
                        throw e;
                    }
                    AmqpInboundChannelAdapter.this.setAttributesIfNecessary(message, null);
                    AmqpInboundChannelAdapter.this.getMessagingTemplate().send((MessagingTemplate) errorChannel, (org.springframework.messaging.Message<?>) AmqpInboundChannelAdapter.this.buildErrorMessage(null, EndpointUtils.errorMessagePayload(message, channel, this.manualAcks, e)));
                    if (this.retryOps == null) {
                        AmqpInboundChannelAdapter.ATTRIBUTES_HOLDER.remove();
                    }
                }
            } catch (Throwable th) {
                if (this.retryOps == null) {
                    AmqpInboundChannelAdapter.ATTRIBUTES_HOLDER.remove();
                }
                throw th;
            }
        }

        private void createAndSend(Message message, Channel channel) {
            org.springframework.messaging.Message<Object> createMessageFromAmqp = createMessageFromAmqp(message, channel);
            AmqpInboundChannelAdapter.this.setAttributesIfNecessary(message, createMessageFromAmqp);
            AmqpInboundChannelAdapter.this.sendMessage(createMessageFromAmqp);
        }

        protected org.springframework.messaging.Message<Object> createMessageFromAmqp(Message message, Channel channel) {
            Object convertPayload = convertPayload(message);
            Map<String, Object> headersFromRequest = AmqpInboundChannelAdapter.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
            if (AmqpInboundChannelAdapter.this.bindSourceMessage) {
                headersFromRequest.put(IntegrationMessageHeaderAccessor.SOURCE_DATA, message);
            }
            return createMessageFromPayload(convertPayload, channel, headersFromRequest, message.getMessageProperties().getDeliveryTag());
        }

        protected Object convertPayload(Message message) {
            Object fromMessage;
            if (AmqpInboundChannelAdapter.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
                ArrayList arrayList = new ArrayList();
                AmqpInboundChannelAdapter.this.batchingStrategy.deBatch(message, message2 -> {
                    arrayList.add(this.converter.fromMessage(message2));
                });
                fromMessage = arrayList;
            } else {
                fromMessage = this.converter.fromMessage(message);
            }
            return fromMessage;
        }

        protected org.springframework.messaging.Message<Object> createMessageFromPayload(Object obj, Channel channel, Map<String, Object> map, long j) {
            if (this.manualAcks) {
                map.put(AmqpHeaders.DELIVERY_TAG, Long.valueOf(j));
                map.put(AmqpHeaders.CHANNEL, channel);
            }
            if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
                map.put(IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT, new AtomicInteger());
            }
            return AmqpInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(obj).copyHeaders(map).build();
        }
    }

    public AmqpInboundChannelAdapter(AbstractMessageListenerContainer abstractMessageListenerContainer) {
        Assert.notNull(abstractMessageListenerContainer, "listenerContainer must not be null");
        Assert.isNull(abstractMessageListenerContainer.getMessageListener(), "The listenerContainer provided to an AMQP inbound Channel Adapter must not have a MessageListener configured since the adapter configure its own listener implementation.");
        this.messageListenerContainer = abstractMessageListenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull(messageConverter, "messageConverter must not be null");
        this.messageConverter = messageConverter;
    }

    public void setHeaderMapper(AmqpHeaderMapper amqpHeaderMapper) {
        Assert.notNull(amqpHeaderMapper, "headerMapper must not be null");
        this.headerMapper = amqpHeaderMapper;
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
        Assert.notNull(batchingStrategy, "'batchingStrategy' cannot be null");
        this.batchingStrategy = batchingStrategy;
    }

    public void setBindSourceMessage(boolean z) {
        this.bindSourceMessage = z;
    }

    public void setBatchMode(BatchMode batchMode) {
        Assert.notNull(batchMode, "'batchMode' cannot be null");
        this.batchMode = batchMode;
    }

    @Override // org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "amqp:inbound-channel-adapter";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        if (this.retryTemplate != null) {
            Assert.state(getErrorChannel() == null, "Cannot have an 'errorChannel' property when a 'RetryTemplate' is provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to send an error message when retries are exhausted");
        }
        this.messageListenerContainer.setMessageListener(this.messageListenerContainer.isConsumerBatchEnabled() ? new BatchListener() : new Listener());
        this.messageListenerContainer.afterPropertiesSet();
        super.onInit();
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStart() {
        this.messageListenerContainer.start();
    }

    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    @Override // org.springframework.integration.context.OrderlyShutdownCapable
    public int beforeShutdown() {
        stop();
        return 0;
    }

    @Override // org.springframework.integration.context.OrderlyShutdownCapable
    public int afterShutdown() {
        return 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setAttributesIfNecessary(Object obj, org.springframework.messaging.Message<?> message) {
        boolean z = getErrorChannel() != null && this.retryTemplate == null;
        boolean z2 = z || this.retryTemplate != null;
        if (z) {
            ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (z2) {
            AttributeAccessor context = this.retryTemplate != null ? RetrySynchronizationManager.getContext() : ATTRIBUTES_HOLDER.get();
            if (context != null) {
                context.setAttribute(ErrorMessageUtils.INPUT_MESSAGE_CONTEXT_KEY, message);
                context.setAttribute("amqp_raw_message", obj);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.MessageProducerSupport
    public AttributeAccessor getErrorMessageAttributes(org.springframework.messaging.Message<?> message) {
        AttributeAccessor attributeAccessor = ATTRIBUTES_HOLDER.get();
        return attributeAccessor == null ? super.getErrorMessageAttributes(message) : attributeAccessor;
    }
}
