package com.networknt.tram.consumer.kafka;

import com.networknt.eventuate.common.impl.JSonMapper;
import com.networknt.eventuate.kafka.consumer.EventuateKafkaConsumer;
import com.networknt.tram.message.common.Message;
import com.networknt.tram.message.common.MessageImpl;
import com.networknt.tram.message.consumer.MessageConsumer;
import com.networknt.tram.message.consumer.MessageHandler;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/networknt/tram/consumer/kafka/MessageConsumerKafkaImpl.class */
public class MessageConsumerKafkaImpl implements MessageConsumer {
    private DuplicateMessageDetector duplicateMessageDetector;
    private Logger logger = LoggerFactory.getLogger(MessageConsumerKafkaImpl.class);
    private List<EventuateKafkaConsumer> consumers = new ArrayList();

    public MessageConsumerKafkaImpl(DuplicateMessageDetector duplicateMessageDetector) {
        this.duplicateMessageDetector = duplicateMessageDetector;
    }

    public void subscribe(String str, Set<String> set, MessageHandler messageHandler) {
        EventuateKafkaConsumer eventuateKafkaConsumer = new EventuateKafkaConsumer(str, (consumerRecord, biConsumer) -> {
            Message message = toMessage(consumerRecord);
            if (this.duplicateMessageDetector.isDuplicate(str, message.getId())) {
                this.logger.trace("Duplicate message {} {}", str, message.getId());
                biConsumer.accept(null, null);
            }
            try {
                this.logger.trace("Invoking handler {} {}", str, message.getId());
                messageHandler.accept(message);
                this.logger.trace("handled message {} {}", str, message.getId());
                biConsumer.accept(null, null);
            } catch (Throwable th) {
                this.logger.trace("Got exception {} {}", str, message.getId());
                this.logger.trace("Got exception ", th);
                biConsumer.accept(null, th);
            }
        }, new ArrayList(set));
        this.consumers.add(eventuateKafkaConsumer);
        eventuateKafkaConsumer.start();
    }

    public void close() {
        this.consumers.forEach((v0) -> {
            v0.stop();
        });
    }

    private Message toMessage(ConsumerRecord<String, String> consumerRecord) {
        return (Message) JSonMapper.fromJson((String) consumerRecord.value(), MessageImpl.class);
    }
}
