package org.apache.eventmesh.connector.kafka.consumer;

import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventDeserializer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.api.EventListener;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/kafka/consumer/ConsumerImpl.class */
public class ConsumerImpl {
    public static final Logger logger = LoggerFactory.getLogger(ConsumerImpl.class);
    private final KafkaConsumer<String, CloudEvent> kafkaConsumer;
    private final Properties properties;
    private AtomicBoolean started = new AtomicBoolean(false);
    private EventListener eventListener;
    private KafkaConsumerRunner kafkaConsumerRunner;
    private ExecutorService executorService;
    private Set<String> topicsSet;

    public ConsumerImpl(Properties properties) {
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", properties.getProperty("bootstrap.servers"));
        properties2.put("value.deserializer", CloudEventDeserializer.class);
        properties2.put("key.deserializer", StringDeserializer.class);
        properties2.put("group.id", properties.getProperty("group.id"));
        properties2.put("enable.auto.commit", "false");
        this.properties = properties2;
        this.kafkaConsumer = new KafkaConsumer<>(properties2);
        this.kafkaConsumerRunner = new KafkaConsumerRunner(this.kafkaConsumer);
        this.executorService = Executors.newFixedThreadPool(10);
        this.topicsSet = new HashSet();
    }

    public Properties attributes() {
        return this.properties;
    }

    public void start() {
        if (this.started.compareAndSet(false, true)) {
            this.executorService.submit(this.kafkaConsumerRunner);
        }
    }

    public synchronized void shutdown() {
        if (this.started.compareAndSet(true, false)) {
            this.kafkaConsumer.close();
        }
    }

    public boolean isStarted() {
        return this.started.get();
    }

    public boolean isClosed() {
        return !isStarted();
    }

    public KafkaConsumer<String, CloudEvent> getKafkaConsumer() {
        return this.kafkaConsumer;
    }

    public synchronized void subscribe(String str) {
        try {
            this.topicsSet.add(str);
            this.kafkaConsumer.subscribe(new ArrayList(this.topicsSet));
        } catch (Exception e) {
            logger.error("Error while subscribing the Kafka consumer to topic: ", e);
            throw new ConnectorRuntimeException(String.format("Kafka consumer can't attach to %s.", str));
        }
    }

    public synchronized void unsubscribe(String str) {
        try {
            this.kafkaConsumer.unsubscribe();
            this.topicsSet.remove(str);
            this.kafkaConsumer.subscribe(new ArrayList(this.topicsSet));
        } catch (Exception e) {
            logger.error("Error while unsubscribing the Kafka consumer: ", e);
            throw new ConnectorRuntimeException(String.format("kafka push consumer fails to unsubscribe topic: %s", str));
        }
    }

    public void updateOffset(List<CloudEvent> list, AbstractContext abstractContext) {
        Long l = (Long) list.stream().map(cloudEvent -> {
            return Long.valueOf(this.kafkaConsumerRunner.getOffset(cloudEvent));
        }).max((v0, v1) -> {
            return Long.compare(v0, v1);
        }).get();
        list.forEach(cloudEvent2 -> {
            updateOffset(cloudEvent2.getSubject(), l.longValue());
        });
    }

    public void updateOffset(String str, long j) {
        this.kafkaConsumer.seek(new TopicPartition(str, 1), j);
    }

    public void registerEventListener(EventListener eventListener) {
        this.eventListener = eventListener;
        this.kafkaConsumerRunner.setListener(this.eventListener);
    }
}
