package io.camunda.connector.kafka.inbound;

import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorResult;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.class */
public class KafkaConnectorConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectorConsumer.class);
    private final InboundConnectorContext context;
    public CompletableFuture<?> future;
    Consumer<String, String> consumer;
    KafkaConnectorProperties elementProps;
    private final Function<Properties, Consumer<String, String>> consumerCreatorFunction;
    boolean shouldLoop = true;
    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    public KafkaConnectorConsumer(Function<Properties, Consumer<String, String>> function, InboundConnectorContext inboundConnectorContext, KafkaConnectorProperties kafkaConnectorProperties) {
        this.consumerCreatorFunction = function;
        this.context = inboundConnectorContext;
        this.elementProps = kafkaConnectorProperties;
    }

    public void startConsumer() {
        this.future = CompletableFuture.runAsync(() -> {
            prepareConsumer();
            consume();
        }, this.executorService);
    }

    private void prepareConsumer() {
        try {
            this.consumer = this.consumerCreatorFunction.apply(KafkaPropertyTransformer.getKafkaProperties(this.elementProps, this.context));
            List<TopicPartition> assignTopicPartitions = assignTopicPartitions(this.consumer, this.elementProps.getTopic().getTopicName());
            Optional.ofNullable(this.elementProps.getOffsets()).ifPresent(list -> {
                seekOffsets(this.consumer, assignTopicPartitions, list);
            });
            reportUp();
        } catch (Exception e) {
            LOG.error("Failed to initialize connector: {}", e.getMessage());
            this.context.reportHealth(Health.down(e));
            throw e;
        }
    }

    private List<TopicPartition> assignTopicPartitions(Consumer<String, String> consumer, String str) {
        List<TopicPartition> list = (List) consumer.partitionsFor(str).stream().map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(Collectors.toList());
        consumer.assign(list);
        return list;
    }

    private void seekOffsets(Consumer<String, String> consumer, List<TopicPartition> list, List<Long> list2) {
        if (list.size() != list2.size()) {
            throw new ConnectorInputException(new IllegalArgumentException("Number of offsets provided is not equal the number of partitions!"));
        }
        for (int i = 0; i < list2.size(); i++) {
            consumer.seek(list.get(i), list2.get(i).longValue());
        }
        LOG.info("Kafka inbound connector initialized");
    }

    public void consume() {
        while (this.shouldLoop) {
            try {
                pollAndPublish();
                reportUp();
            } catch (Exception e) {
                LOG.error("Failed to execute connector: {}", e.getMessage());
                this.context.reportHealth(Health.down(e));
                if (e instanceof OffsetOutOfRangeException) {
                    throw e;
                }
            }
        }
        LOG.debug("Kafka inbound loop finished");
    }

    private void pollAndPublish() {
        LOG.debug("Polling the topics: {}", this.consumer.assignment());
        ConsumerRecords poll = this.consumer.poll(Duration.ofMillis(500L));
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            handleMessage((ConsumerRecord) it.next());
        }
        if (poll.isEmpty()) {
            return;
        }
        this.consumer.commitSync();
    }

    private void handleMessage(ConsumerRecord<String, String> consumerRecord) {
        LOG.trace("Kafka message received: key = {}, value = {}", consumerRecord.key(), consumerRecord.value());
        InboundConnectorResult correlate = this.context.correlate(KafkaPropertyTransformer.convertConsumerRecordToKafkaInboundMessage(consumerRecord));
        if (correlate.isActivated()) {
            LOG.debug("Inbound event correlated successfully: {}", correlate.getResponseData());
        } else {
            LOG.debug("Inbound event not correlated: {}", correlate.getErrorData());
        }
    }

    public void stopConsumer() throws ExecutionException, InterruptedException {
        this.shouldLoop = false;
        if (this.future != null && !this.future.isDone()) {
            this.future.get();
        }
        this.consumer.close();
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    private void reportUp() {
        HashMap hashMap = new HashMap();
        hashMap.put("group-id", this.consumer.groupMetadata().groupId());
        hashMap.put("group-instance-id", this.consumer.groupMetadata().groupInstanceId().orElse("unknown"));
        hashMap.put("group-generation-id", Integer.valueOf(this.consumer.groupMetadata().generationId()));
        this.context.reportHealth(Health.up(hashMap));
    }
}
