package cool.doudou.doudada.mq.core.processor;

import cool.doudou.doudada.mq.annotation.MqConsumer;
import cool.doudou.doudada.mq.core.enums.MsgTypeEnum;
import cool.doudou.doudada.mq.properties.PulsarProperties;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;

/* loaded from: input_file:cool/doudou/doudada/mq/core/processor/ConsumerBeanPostProcessor.class */
public class ConsumerBeanPostProcessor implements BeanPostProcessor {
    private static final Logger log = LoggerFactory.getLogger(ConsumerBeanPostProcessor.class);
    private PulsarClient pulsarClient;
    private PulsarProperties pulsarProperties;

    public Object postProcessBeforeInitialization(Object obj, String str) throws BeansException {
        Arrays.stream(obj.getClass().getDeclaredMethods()).filter(method -> {
            return method.isAnnotationPresent(MqConsumer.class);
        }).forEach(method2 -> {
            MqConsumer mqConsumer = (MqConsumer) method2.getAnnotation(MqConsumer.class);
            initConsumer(mqConsumer.topics(), mqConsumer.msgType(), obj, method2);
        });
        return obj;
    }

    private void initConsumer(String[] strArr, MsgTypeEnum msgTypeEnum, Object obj, Method method) {
        if (strArr == null || strArr.length <= 0) {
            log.error("initConsumer error: @MqConsumer.topics must be specified");
            return;
        }
        try {
            this.pulsarClient.newConsumer(msgTypeEnum.get()).topic(strArr).subscriptionName(this.pulsarProperties.getSubscriptionName()).subscriptionType(SubscriptionType.valueOf(this.pulsarProperties.getSubscriptionType())).subscriptionInitialPosition(SubscriptionInitialPosition.valueOf(this.pulsarProperties.getSubscriptionInitialPosition())).negativeAckRedeliveryDelay(this.pulsarProperties.getNegativeAckRedeliveryDelay().intValue(), TimeUnit.SECONDS).messageListener((consumer, message) -> {
                try {
                    method.setAccessible(true);
                    method.invoke(obj, consumer.getTopic(), message.getValue());
                    consumer.acknowledge(message);
                } catch (Exception e) {
                    consumer.negativeAcknowledge(message);
                    throw new RuntimeException("bean[" + obj + "].method[" + method + "]invoke exception: ", e);
                }
            }).subscribe();
        } catch (PulsarClientException e) {
            log.error("initConsumer[{}] exception: ", strArr, e);
        }
    }

    public Object postProcessAfterInitialization(Object obj, String str) throws BeansException {
        return obj;
    }

    public ConsumerBeanPostProcessor(PulsarClient pulsarClient, PulsarProperties pulsarProperties) {
        this.pulsarClient = pulsarClient;
        this.pulsarProperties = pulsarProperties;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -368019756:
                if (implMethodName.equals("lambda$initConsumer$37753887$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("cool/doudou/doudada/mq/core/processor/ConsumerBeanPostProcessor") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/reflect/Method;Ljava/lang/Object;Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    Method method = (Method) serializedLambda.getCapturedArg(0);
                    Object capturedArg = serializedLambda.getCapturedArg(1);
                    return (consumer, message) -> {
                        try {
                            method.setAccessible(true);
                            method.invoke(capturedArg, consumer.getTopic(), message.getValue());
                            consumer.acknowledge(message);
                        } catch (Exception e) {
                            consumer.negativeAcknowledge(message);
                            throw new RuntimeException("bean[" + capturedArg + "].method[" + method + "]invoke exception: ", e);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
