package io.camunda.connector.kafka.inbound;

import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorResult;
import io.camunda.connector.impl.ConnectorInputException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.class */
public class KafkaConnectorConsumer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaConnectorConsumer.class);
    private final InboundConnectorContext context;
    public CompletableFuture<?> future;
    Consumer<String, String> consumer;
    boolean shouldLoop = true;
    private final KafkaConnectorProperties elementProps;
    private final Function<Properties, Consumer<String, String>> consumerCreatorFunction;

    public KafkaConnectorConsumer(Function<Properties, Consumer<String, String>> function, InboundConnectorContext inboundConnectorContext, KafkaConnectorProperties kafkaConnectorProperties) {
        this.consumerCreatorFunction = function;
        this.context = inboundConnectorContext;
        this.elementProps = kafkaConnectorProperties;
    }

    public void startConsumer() {
        this.future = CompletableFuture.supplyAsync(() -> {
            try {
                this.consumer = createConsumer(this.consumerCreatorFunction, KafkaPropertyTransformer.getKafkaProperties(this.elementProps, this.context), this.elementProps, KafkaPropertyTransformer.getOffsets(this.elementProps.getOffsets()));
                reportUp();
                while (this.shouldLoop) {
                    try {
                        ConsumerRecords<String, String> poll = this.consumer.poll(Duration.ofMillis(500L));
                        Iterator<ConsumerRecord<String, String>> it = poll.iterator();
                        while (it.hasNext()) {
                            handleMessage(it.next());
                        }
                        if (!poll.isEmpty()) {
                            this.consumer.commitSync();
                        }
                        reportUp();
                    } catch (Exception e) {
                        LOG.error("Failed to execute connector: {}", e.getMessage());
                        this.context.reportHealth(Health.down(e));
                        throw e;
                    }
                }
                LOG.debug("Kafka inbound loop finished");
                return null;
            } catch (Exception e2) {
                LOG.error("Failed to initialize connector: {}", e2.getMessage());
                this.context.reportHealth(Health.down(e2));
                throw e2;
            }
        });
    }

    private void handleMessage(ConsumerRecord<String, String> consumerRecord) {
        LOG.trace("Kafka message received: key = {}, value = {}", consumerRecord.key(), consumerRecord.value());
        InboundConnectorResult<?> correlate = this.context.correlate(KafkaPropertyTransformer.convertConsumerRecordToKafkaInboundMessage(consumerRecord));
        if (correlate.isActivated()) {
            LOG.debug("Inbound event correlated successfully: {}", correlate.getResponseData());
        } else {
            LOG.debug("Inbound event not correlated: {}", correlate.getErrorData());
        }
    }

    public void stopConsumer() throws ExecutionException, InterruptedException {
        this.shouldLoop = false;
        if (this.future != null && !this.future.isDone()) {
            this.future.get();
        }
        this.consumer.close();
    }

    private Consumer<String, String> createConsumer(Function<Properties, Consumer<String, String>> function, Properties properties, KafkaConnectorProperties kafkaConnectorProperties, List<Long> list) {
        Consumer<String, String> apply = function.apply(properties);
        List<PartitionInfo> partitionsFor = apply.partitionsFor(kafkaConnectorProperties.getTopic().getTopicName());
        List list2 = (List) partitionsFor.stream().map(partitionInfo -> {
            return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
        }).collect(Collectors.toList());
        apply.assign(list2);
        if (list != null) {
            if (partitionsFor.size() != list.size()) {
                throw new ConnectorInputException(new IllegalArgumentException("Number of offsets provided is not equal the number of partitions!"));
            }
            for (int i = 0; i < list.size(); i++) {
                apply.seek((TopicPartition) list2.get(i), list.get(i).longValue());
            }
        }
        LOG.info("Kafka inbound connector initialized");
        return apply;
    }

    private void reportUp() {
        HashMap hashMap = new HashMap();
        hashMap.put("group-id", this.consumer.groupMetadata().groupId());
        hashMap.put("group-instance-id", this.consumer.groupMetadata().groupInstanceId().orElse("unknown"));
        hashMap.put("group-generation-id", Integer.valueOf(this.consumer.groupMetadata().generationId()));
        this.context.reportHealth(Health.up(hashMap));
    }
}
