/*
 * Decompiled with CFR 0.152.
 */
package cn.hippo4j.adapter.springcloud.stream.rabbitmq;

import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.CollectionUtil;
import cn.hippo4j.common.toolkit.ReflectUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.cloud.stream.binder.Binding;
import org.springframework.cloud.stream.binder.DefaultBinding;
import org.springframework.cloud.stream.binding.InputBindingLifecycle;
import org.springframework.context.ApplicationListener;
import org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter;

public class SpringCloudStreamRabbitMQThreadPoolAdapter
implements ThreadPoolAdapter,
ApplicationListener<ApplicationStartedEvent> {
    private static final Logger log = LoggerFactory.getLogger(SpringCloudStreamRabbitMQThreadPoolAdapter.class);
    private final Map<String, AbstractMessageListenerContainer> ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR = new HashMap<String, AbstractMessageListenerContainer>();

    public String mark() {
        return "RabbitMQSpringCloudStream";
    }

    public ThreadPoolAdapterState getThreadPoolState(String identify) {
        ThreadPoolAdapterState result = new ThreadPoolAdapterState();
        AbstractMessageListenerContainer messageListenerContainer = this.ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(identify);
        if (messageListenerContainer != null) {
            result.setThreadPoolKey(identify);
            if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
                int concurrentConsumers = (Integer)ReflectUtil.getFieldValue((Object)messageListenerContainer, (String)"concurrentConsumers");
                result.setCoreSize(Integer.valueOf(concurrentConsumers));
                Object maxConcurrentConsumers = ReflectUtil.getFieldValue((Object)messageListenerContainer, (String)"maxConcurrentConsumers");
                if (maxConcurrentConsumers != null) {
                    result.setMaximumSize((Integer)maxConcurrentConsumers);
                } else {
                    result.setMaximumSize(Integer.valueOf(concurrentConsumers));
                }
            } else if (messageListenerContainer instanceof DirectMessageListenerContainer) {
                int consumersPerQueue = (Integer)ReflectUtil.getFieldValue((Object)messageListenerContainer, (String)"consumersPerQueue");
                result.setCoreSize(Integer.valueOf(consumersPerQueue));
                result.setMaximumSize(Integer.valueOf(consumersPerQueue));
            }
            return result;
        }
        log.warn("[{}] RabbitMQ consuming thread pool not found.", (Object)identify);
        return result;
    }

    public List<ThreadPoolAdapterState> getThreadPoolStates() {
        ArrayList<ThreadPoolAdapterState> adapterStateList = new ArrayList<ThreadPoolAdapterState>();
        this.ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.forEach((key, val) -> adapterStateList.add(this.getThreadPoolState((String)key)));
        return adapterStateList;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
        String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
        AbstractMessageListenerContainer messageListenerContainer = this.ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.get(threadPoolKey);
        if (messageListenerContainer != null) {
            Map<String, AbstractMessageListenerContainer> map = this.ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR;
            synchronized (map) {
                Integer corePoolSize = threadPoolAdapterParameter.getCorePoolSize();
                Integer maximumPoolSize = threadPoolAdapterParameter.getMaximumPoolSize();
                if (messageListenerContainer instanceof SimpleMessageListenerContainer) {
                    int originalCoreSize = (Integer)ReflectUtil.getFieldValue((Object)messageListenerContainer, (String)"concurrentConsumers");
                    Object maxConcurrentConsumers = ReflectUtil.getFieldValue((Object)messageListenerContainer, (String)"maxConcurrentConsumers");
                    int originalMaximumPoolSize = maxConcurrentConsumers != null ? (Integer)maxConcurrentConsumers : originalCoreSize;
                    SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer)messageListenerContainer;
                    if (originalCoreSize > maximumPoolSize) {
                        simpleMessageListenerContainer.setConcurrentConsumers(corePoolSize.intValue());
                        simpleMessageListenerContainer.setMaxConcurrentConsumers(maximumPoolSize.intValue());
                    } else {
                        simpleMessageListenerContainer.setMaxConcurrentConsumers(maximumPoolSize.intValue());
                        simpleMessageListenerContainer.setConcurrentConsumers(corePoolSize.intValue());
                    }
                    log.info("[{}] RabbitMQ consumption thread pool parameter change. coreSize: {}, maximumSize: {}", new Object[]{threadPoolKey, String.format("%s => %s", originalCoreSize, corePoolSize), String.format("%s => %s", originalMaximumPoolSize, maximumPoolSize)});
                } else if (messageListenerContainer instanceof DirectMessageListenerContainer) {
                    int originalCoreSize = (Integer)ReflectUtil.getFieldValue((Object)messageListenerContainer, (String)"consumersPerQueue");
                    DirectMessageListenerContainer directMessageListenerContainer = (DirectMessageListenerContainer)messageListenerContainer;
                    directMessageListenerContainer.setConsumersPerQueue(maximumPoolSize.intValue());
                    log.info("[{}] RabbitMQ consumption thread pool parameter change. coreSize: {}", (Object)threadPoolKey, (Object)String.format("%s => %s", originalCoreSize, corePoolSize));
                } else {
                    log.warn("[{}] RabbitMQ consuming thread pool not support. messageListenerContainer: {}", (Object)threadPoolKey, messageListenerContainer.getClass());
                    return false;
                }
            }
            return true;
        }
        log.warn("[{}] RabbitMQ consuming thread pool not found.", (Object)threadPoolKey);
        return false;
    }

    public void onApplicationEvent(ApplicationStartedEvent event) {
        InputBindingLifecycle bindingLifecycle = (InputBindingLifecycle)ApplicationContextHolder.getBean(InputBindingLifecycle.class);
        Collection inputBindings = Optional.ofNullable(ReflectUtil.getFieldValue((Object)bindingLifecycle, (String)"inputBindings")).map(each -> (Collection)each).orElse(null);
        if (CollectionUtil.isEmpty((Collection)inputBindings)) {
            log.info("InputBindings record not found.");
            return;
        }
        try {
            for (Binding each2 : inputBindings) {
                String bindingName = each2.getBindingName();
                DefaultBinding defaultBinding = (DefaultBinding)each2;
                Object lifecycle = ReflectUtil.getFieldValue((Object)defaultBinding, (String)"lifecycle");
                if (!(lifecycle instanceof AmqpInboundChannelAdapter)) continue;
                AbstractMessageListenerContainer rabbitMQListenerContainer = (AbstractMessageListenerContainer)ReflectUtil.getFieldValue((Object)lifecycle, (String)"messageListenerContainer");
                this.ROCKET_MQ_SPRING_CLOUD_STREAM_CONSUME_EXECUTOR.put(bindingName, rabbitMQListenerContainer);
            }
        }
        catch (Exception ex) {
            log.error("Failed to get input-bindings thread pool.", (Throwable)ex);
        }
    }
}

