package com.github.linyuzai.event.kafka.subscriber;

import com.github.linyuzai.event.core.context.EventContext;
import com.github.linyuzai.event.core.error.EventErrorHandler;
import com.github.linyuzai.event.core.listener.EventListener;
import com.github.linyuzai.event.core.subscriber.Subscription;
import com.github.linyuzai.event.kafka.endpoint.KafkaEventEndpoint;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.MessageListenerContainer;

/* loaded from: input_file:com/github/linyuzai/event/kafka/subscriber/AbstractKafkaEventSubscriber.class */
public abstract class AbstractKafkaEventSubscriber extends KafkaEventSubscriber {
    public Subscription doSubscribe(EventListener eventListener, KafkaEventEndpoint kafkaEventEndpoint, EventContext eventContext) {
        MessageListenerContainer createMessageListenerContainer = createMessageListenerContainer(kafkaEventEndpoint, eventContext);
        if (createMessageListenerContainer.getContainerProperties().getMessageListener() == null) {
            createMessageListenerContainer.getContainerProperties().setMessageListener(createMessageListener(eventListener, kafkaEventEndpoint, eventContext));
        }
        createMessageListenerContainer.start();
        return new KafkaSubscription(createMessageListenerContainer);
    }

    public abstract MessageListenerContainer createMessageListenerContainer(KafkaEventEndpoint kafkaEventEndpoint, EventContext eventContext);

    public MessageListener<?, ?> createMessageListener(EventListener eventListener, KafkaEventEndpoint kafkaEventEndpoint, EventContext eventContext) {
        ContainerProperties.AckMode ackMode = kafkaEventEndpoint.getProperties().getListener().getAckMode();
        return (ackMode == ContainerProperties.AckMode.MANUAL || ackMode == ContainerProperties.AckMode.MANUAL_IMMEDIATE) ? (consumerRecord, acknowledgment) -> {
            acknowledgment.getClass();
            handleMessage(consumerRecord, eventListener, kafkaEventEndpoint, eventContext, acknowledgment::acknowledge);
        } : consumerRecord2 -> {
            handleMessage(consumerRecord2, eventListener, kafkaEventEndpoint, eventContext, null);
        };
    }

    public void handleMessage(ConsumerRecord<Object, Object> consumerRecord, EventListener eventListener, KafkaEventEndpoint kafkaEventEndpoint, EventContext eventContext, Runnable runnable) {
        EventErrorHandler eventErrorHandler = (EventErrorHandler) eventContext.get(EventErrorHandler.class);
        try {
            eventListener.onEvent(getPayload(consumerRecord, kafkaEventEndpoint, eventContext), kafkaEventEndpoint, eventContext);
            if (runnable != null) {
                runnable.run();
            }
        } catch (Throwable th) {
            eventErrorHandler.onError(th, kafkaEventEndpoint, eventContext);
        }
    }

    public Object getPayload(ConsumerRecord<?, ?> consumerRecord, KafkaEventEndpoint kafkaEventEndpoint, EventContext eventContext) {
        return consumerRecord.value();
    }
}
