/*
 * Decompiled with CFR 0.152.
 */
package cn.hippo4j.adapter.kafka;

import cn.hippo4j.adapter.base.ThreadPoolAdapter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterParameter;
import cn.hippo4j.adapter.base.ThreadPoolAdapterState;
import cn.hippo4j.common.config.ApplicationContextHolder;
import cn.hippo4j.common.toolkit.ReflectUtil;
import java.lang.reflect.AccessibleObject;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.cglib.core.Constants;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.TopicPartitionOffset;

public class KafkaThreadPoolAdapter
implements ThreadPoolAdapter,
ApplicationListener<ApplicationStartedEvent> {
    @Generated
    private static final Logger log = LoggerFactory.getLogger(KafkaThreadPoolAdapter.class);
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public String mark() {
        return "Kafka";
    }

    public ThreadPoolAdapterState getThreadPoolState(String identify) {
        ThreadPoolAdapterState result = new ThreadPoolAdapterState();
        MessageListenerContainer listenerContainer = this.kafkaListenerEndpointRegistry.getListenerContainer(identify);
        if (listenerContainer == null) {
            log.warn("[{}] Kafka consuming thread pool not found.", (Object)identify);
            return result;
        }
        result.setThreadPoolKey(identify);
        if (listenerContainer instanceof ConcurrentMessageListenerContainer) {
            result.setCoreSize(Integer.valueOf(((ConcurrentMessageListenerContainer)listenerContainer).getConcurrency()));
            result.setMaximumSize(result.getCoreSize());
        } else {
            result.setCoreSize(Integer.valueOf(1));
            result.setMaximumSize(Integer.valueOf(1));
        }
        return result;
    }

    public List<ThreadPoolAdapterState> getThreadPoolStates() {
        ArrayList<ThreadPoolAdapterState> adapterStateList = new ArrayList<ThreadPoolAdapterState>();
        this.kafkaListenerEndpointRegistry.getListenerContainerIds().forEach(id -> adapterStateList.add(this.getThreadPoolState((String)id)));
        return adapterStateList;
    }

    public boolean updateThreadPool(ThreadPoolAdapterParameter threadPoolAdapterParameter) {
        int originalCoreSize;
        String threadPoolKey = threadPoolAdapterParameter.getThreadPoolKey();
        MessageListenerContainer listenerContainer = this.kafkaListenerEndpointRegistry.getListenerContainer(threadPoolKey);
        if (listenerContainer == null) {
            log.warn("[{}] Kafka consuming thread pool not found.", (Object)threadPoolKey);
            return false;
        }
        if (!(listenerContainer instanceof ConcurrentMessageListenerContainer)) {
            log.warn("[{}] Kafka consuming thread pool not support modify.", (Object)threadPoolKey);
            return false;
        }
        ConcurrentMessageListenerContainer concurrentContainer = (ConcurrentMessageListenerContainer)listenerContainer;
        int originalMaximumPoolSize = originalCoreSize = concurrentContainer.getConcurrency();
        Integer concurrency = threadPoolAdapterParameter.getCorePoolSize();
        if (originalCoreSize < concurrency) {
            if (!KafkaThreadPoolAdapter.addConsumer(threadPoolKey, concurrentContainer, originalCoreSize, concurrency)) {
                return false;
            }
        } else {
            KafkaThreadPoolAdapter.decreaseConsumer(threadPoolKey, concurrentContainer, originalCoreSize, concurrency);
        }
        concurrentContainer.setConcurrency(concurrency.intValue());
        log.info("[{}] Kafka consumption thread pool parameter change. coreSize: {}, maximumSize: {}", new Object[]{threadPoolKey, String.format("%s => %s", originalCoreSize, concurrency), String.format("%s => %s", originalMaximumPoolSize, concurrency)});
        return true;
    }

    private static void decreaseConsumer(String threadPoolKey, ConcurrentMessageListenerContainer concurrentContainer, int originalCoreSize, Integer concurrency) {
        int targetDecrease = originalCoreSize - concurrency;
        List containers = (List)ReflectUtil.getFieldValue((Object)concurrentContainer, (String)"containers");
        Iterator iterator = containers.iterator();
        int count = 0;
        while (iterator.hasNext() && count < targetDecrease) {
            KafkaMessageListenerContainer container = (KafkaMessageListenerContainer)iterator.next();
            if (!container.isRunning()) continue;
            container.stop(() -> {});
            ++count;
        }
        log.info("[{}] Kafka consumption change. target decrease {} ,real decrease {}", new Object[]{threadPoolKey, targetDecrease, count});
    }

    private static boolean addConsumer(String threadPoolKey, ConcurrentMessageListenerContainer concurrentContainer, int originalCoreSize, Integer concurrency) {
        int size;
        ContainerProperties containerProperties = concurrentContainer.getContainerProperties();
        TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();
        if (topicPartitions != null && concurrency > topicPartitions.length) {
            log.warn("[{}] Kafka consuming thread pool not support modify. When specific partitions are provided, the concurrency must be less than or equal to the number of partitions;", (Object)threadPoolKey);
            return false;
        }
        List containers = (List)ReflectUtil.getFieldValue((Object)concurrentContainer, (String)"containers");
        boolean alwaysClientIdSuffix = (Boolean)ReflectUtil.getFieldValue((Object)concurrentContainer, (String)"alwaysClientIdSuffix");
        for (int i = size = containers.size(); i < concurrency - originalCoreSize + size; ++i) {
            KafkaMessageListenerContainer container = (KafkaMessageListenerContainer)ReflectUtil.invoke((Object)concurrentContainer, (String)"constructContainer", (Object[])new Object[]{containerProperties, topicPartitions, i});
            String beanName = concurrentContainer.getBeanName();
            container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
            container.setApplicationContext(ApplicationContextHolder.getInstance());
            if (concurrentContainer.getApplicationEventPublisher() != null) {
                container.setApplicationEventPublisher(concurrentContainer.getApplicationEventPublisher());
            }
            container.setClientIdSuffix(concurrency > 1 || alwaysClientIdSuffix ? "-" + i : "");
            container.setGenericErrorHandler((GenericErrorHandler)ReflectUtil.invoke((Object)concurrentContainer, (String)"getGenericErrorHandler", (Object[])new Object[0]));
            container.setAfterRollbackProcessor((AfterRollbackProcessor)ReflectUtil.invoke((Object)concurrentContainer, (String)"getAfterRollbackProcessor", (Object[])new Object[0]));
            Method getRecordInterceptor = ReflectUtil.findDeclaredMethod(concurrentContainer.getClass(), (String)"getRecordInterceptor", (Class[])Constants.EMPTY_CLASS_ARRAY);
            ReflectUtil.setAccessible((AccessibleObject)getRecordInterceptor);
            container.setRecordInterceptor((RecordInterceptor)ReflectUtil.invoke((Object)concurrentContainer, (Method)getRecordInterceptor, (Object[])new Object[0]));
            Method isInterceptBeforeTx = ReflectUtil.findDeclaredMethod(concurrentContainer.getClass(), (String)"isInterceptBeforeTx", (Class[])Constants.EMPTY_CLASS_ARRAY);
            ReflectUtil.setAccessible((AccessibleObject)isInterceptBeforeTx);
            container.setInterceptBeforeTx(((Boolean)ReflectUtil.invoke((Object)concurrentContainer, (Method)isInterceptBeforeTx, (Object[])new Object[0])).booleanValue());
            container.setEmergencyStop(() -> {
                concurrentContainer.stop(() -> {});
                ReflectUtil.invoke((Object)concurrentContainer, (String)"publishContainerStoppedEvent", (Object[])new Object[0]);
            });
            Method isPaused = ReflectUtil.findDeclaredMethod(concurrentContainer.getClass(), (String)"isPaused", (Class[])Constants.EMPTY_CLASS_ARRAY);
            ReflectUtil.setAccessible((AccessibleObject)isPaused);
            if (((Boolean)ReflectUtil.invoke((Object)concurrentContainer, (Method)isPaused, (Object[])new Object[0])).booleanValue()) {
                container.pause();
            }
            container.start();
            containers.add(container);
        }
        return true;
    }

    public void onApplicationEvent(ApplicationStartedEvent event) {
        try {
            KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
            this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry = (KafkaListenerEndpointRegistry)ApplicationContextHolder.getBean(KafkaListenerEndpointRegistry.class);
        }
        catch (Exception ex) {
            log.error("Failed to get Kafka thread pool.", (Throwable)ex);
        }
    }
}

