package org.springframework.integration.scattergather;

import org.springframework.aop.support.AopUtils;
import org.springframework.context.Lifecycle;
import org.springframework.integration.channel.FixedSubscriberChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.context.IntegrationContextUtils;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.endpoint.AbstractEndpoint;
import org.springframework.integration.endpoint.EventDrivenConsumer;
import org.springframework.integration.endpoint.PollingConsumer;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.support.channel.HeaderChannelRegistry;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-4.3.17.RELEASE.jar:org/springframework/integration/scattergather/ScatterGatherHandler.class */
public class ScatterGatherHandler extends AbstractReplyProducingMessageHandler implements Lifecycle {
    private static final String GATHER_RESULT_CHANNEL = "gatherResultChannel";
    private final MessageChannel scatterChannel;
    private final MessageHandler gatherer;
    private MessageChannel gatherChannel;
    private long gatherTimeout;
    private AbstractEndpoint gatherEndpoint;
    private HeaderChannelRegistry replyChannelRegistry;

    public ScatterGatherHandler(MessageChannel messageChannel, MessageHandler messageHandler) {
        this.gatherTimeout = -1L;
        Assert.notNull(messageChannel, "'scatterChannel' must not be null");
        Assert.notNull(messageHandler, "'gatherer' must not be null");
        checkClass(AopUtils.getTargetClass(messageHandler), "org.springframework.integration.aggregator.AggregatingMessageHandler", "gatherer");
        this.scatterChannel = messageChannel;
        this.gatherer = messageHandler;
    }

    public ScatterGatherHandler(MessageHandler messageHandler, MessageHandler messageHandler2) {
        this(new FixedSubscriberChannel(messageHandler), messageHandler2);
        Assert.notNull(messageHandler, "'scatterer' must not be null");
        checkClass(AopUtils.getTargetClass(messageHandler), "org.springframework.integration.router.RecipientListRouter", "scatterer");
    }

    public void setGatherChannel(MessageChannel messageChannel) {
        this.gatherChannel = messageChannel;
    }

    public void setGatherTimeout(long j) {
        this.gatherTimeout = j;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    public void doInit() {
        if (this.gatherChannel == null) {
            this.gatherChannel = new FixedSubscriberChannel(this.gatherer);
        } else {
            if (this.gatherChannel instanceof SubscribableChannel) {
                this.gatherEndpoint = new EventDrivenConsumer((SubscribableChannel) this.gatherChannel, this.gatherer);
            } else {
                if (!(this.gatherChannel instanceof PollableChannel)) {
                    throw new MessagingException("Unsupported 'replyChannel' type [" + this.gatherChannel.getClass() + "].SubscribableChannel or PollableChannel type are supported.");
                }
                this.gatherEndpoint = new PollingConsumer((PollableChannel) this.gatherChannel, this.gatherer);
                ((PollingConsumer) this.gatherEndpoint).setReceiveTimeout(this.gatherTimeout);
            }
            this.gatherEndpoint.setBeanFactory(getBeanFactory());
            this.gatherEndpoint.afterPropertiesSet();
        }
        ((MessageProducer) this.gatherer).setOutputChannel(new FixedSubscriberChannel(new MessageHandler() { // from class: org.springframework.integration.scattergather.ScatterGatherHandler.1
            @Override // org.springframework.messaging.MessageHandler
            public void handleMessage(Message<?> message) throws MessagingException {
                MessageHeaders headers = message.getHeaders();
                if (!headers.containsKey(ScatterGatherHandler.GATHER_RESULT_CHANNEL)) {
                    throw new MessageDeliveryException(message, "The 'gatherResultChannel' header is required to delivery gather result.");
                }
                Object obj = headers.get(ScatterGatherHandler.GATHER_RESULT_CHANNEL);
                if (obj instanceof MessageChannel) {
                    ScatterGatherHandler.this.messagingTemplate.send((MessagingTemplate) obj, message);
                } else if (obj instanceof String) {
                    ScatterGatherHandler.this.messagingTemplate.send((String) obj, message);
                }
            }
        }));
        this.replyChannelRegistry = (HeaderChannelRegistry) getBeanFactory().getBean(IntegrationContextUtils.INTEGRATION_HEADER_CHANNEL_REGISTRY_BEAN_NAME, HeaderChannelRegistry.class);
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    protected Object handleRequestMessage(Message<?> message) {
        QueueChannel queueChannel = new QueueChannel();
        this.messagingTemplate.send((MessagingTemplate) this.scatterChannel, getMessageBuilderFactory().fromMessage(message).setHeader(GATHER_RESULT_CHANNEL, this.replyChannelRegistry.channelToChannelName(queueChannel)).setReplyChannel(this.gatherChannel).build());
        Message<?> receive = queueChannel.receive(this.gatherTimeout);
        if (receive != null) {
            return receive.getPayload();
        }
        return null;
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        if (this.gatherEndpoint != null) {
            this.gatherEndpoint.start();
        }
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        if (this.gatherEndpoint != null) {
            this.gatherEndpoint.stop();
        }
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.gatherEndpoint == null || this.gatherEndpoint.isRunning();
    }

    private void checkClass(Class<?> cls, String str, String str2) throws LinkageError {
        Class<?> cls2 = null;
        try {
            cls2 = ClassUtils.forName(str, getClass().getClassLoader());
        } catch (Exception e) {
        }
        Assert.isAssignable(cls2, cls, "the '" + str2 + "' must be an " + str + " instance");
    }
}
