package io.camunda.connector.kafka.inbound;

import connector.com.fasterxml.jackson.core.JsonParser;
import connector.com.fasterxml.jackson.core.json.JsonReadFeature;
import connector.com.fasterxml.jackson.databind.DeserializationFeature;
import connector.com.fasterxml.jackson.databind.ObjectMapper;
import connector.com.fasterxml.jackson.databind.ObjectReader;
import connector.com.fasterxml.jackson.databind.SerializationFeature;
import connector.com.fasterxml.jackson.dataformat.avro.AvroMapper;
import connector.com.fasterxml.jackson.dataformat.avro.AvroSchema;
import connector.com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import connector.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import connector.com.fasterxml.jackson.module.scala.DefaultScalaModule$;
import dev.failsafe.Failsafe;
import dev.failsafe.RetryPolicy;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.network.ClientInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/connector/kafka/inbound/KafkaConnectorConsumer.class */
public class KafkaConnectorConsumer {
    private final InboundConnectorContext context;
    public CompletableFuture<?> future;
    Consumer<Object, Object> consumer;
    KafkaConnectorProperties elementProps;
    private final RetryPolicy<Object> retryPolicy;
    private final Function<Properties, Consumer<Object, Object>> consumerCreatorFunction;
    private ObjectReader avroObjectReader;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaConnectorConsumer.class);
    public static ObjectMapper objectMapper = new ObjectMapper().registerModule(new Jdk8Module()).registerModule(DefaultScalaModule$.MODULE$).registerModule(new JavaTimeModule()).disable(SerializationFeature.FAIL_ON_EMPTY_BEANS).disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES).enable(JsonParser.Feature.ALLOW_SINGLE_QUOTES).enable(JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature());
    private Health consumerStatus = Health.unknown();
    boolean shouldLoop = true;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();

    public KafkaConnectorConsumer(Function<Properties, Consumer<Object, Object>> function, InboundConnectorContext inboundConnectorContext, KafkaConnectorProperties kafkaConnectorProperties, RetryPolicy<Object> retryPolicy) {
        this.consumerCreatorFunction = function;
        this.context = inboundConnectorContext;
        this.elementProps = kafkaConnectorProperties;
        this.retryPolicy = retryPolicy;
    }

    public void startConsumer() {
        if (this.elementProps.getAvro() != null) {
            this.avroObjectReader = new AvroMapper().reader(new AvroSchema(new Schema.Parser().setValidate(true).parse(StringEscapeUtils.unescapeJson(this.elementProps.getAvro().schema()))));
        }
        this.future = Failsafe.with(this.retryPolicy, new RetryPolicy[0]).with(this.executorService).getAsync(() -> {
            try {
                prepareConsumer();
                consume();
                return null;
            } catch (Exception e) {
                LOG.error("Consumer loop failure, retry pending: {}", e.getMessage());
                try {
                    this.consumer.close();
                } catch (Exception e2) {
                    LOG.error("Failed to close consumer before retrying, reason: {}. This error will be ignored. If the consumer is still running, it will be disconnected after max.poll.interval.ms.", e2.getMessage());
                }
                throw e;
            }
        }).exceptionally(th -> {
            this.shouldLoop = false;
            return null;
        });
    }

    private void prepareConsumer() {
        try {
            this.consumer = this.consumerCreatorFunction.apply(KafkaPropertyTransformer.getKafkaProperties(this.elementProps, this.context));
            String topicName = this.elementProps.getTopic().getTopicName();
            this.consumer.subscribe(List.of(topicName), new OffsetUpdateRequiredListener(topicName, this.consumer, this.elementProps.getOffsets()));
            reportUp();
        } catch (Exception e) {
            LOG.error("Failed to initialize connector: {}", e.getMessage());
            this.context.reportHealth(Health.down(e));
            throw e;
        }
    }

    public void consume() {
        while (this.shouldLoop) {
            try {
                pollAndPublish();
                reportUp();
            } catch (Exception e) {
                reportDown(e);
                throw e;
            }
        }
        LOG.debug("Kafka inbound loop finished");
    }

    private void pollAndPublish() {
        LOG.debug("Polling the topics: {}", this.consumer.assignment());
        ConsumerRecords<Object, Object> poll = this.consumer.poll(Duration.ofMillis(500L));
        Iterator<ConsumerRecord<Object, Object>> it = poll.iterator();
        while (it.hasNext()) {
            handleMessage(it.next());
        }
        if (poll.isEmpty()) {
            return;
        }
        this.consumer.commitSync();
    }

    private void handleMessage(ConsumerRecord<Object, Object> consumerRecord) {
        LOG.trace("Kafka message received: key = {}, value = {}", consumerRecord.key(), consumerRecord.value());
        this.context.correlate(KafkaPropertyTransformer.convertConsumerRecordToKafkaInboundMessage(consumerRecord, this.avroObjectReader != null ? this.avroObjectReader : objectMapper.reader()));
    }

    public void stopConsumer() throws ExecutionException, InterruptedException {
        this.shouldLoop = false;
        if (this.future != null && !this.future.isDone()) {
            this.future.get();
        }
        this.consumer.close();
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    private void reportUp() {
        HashMap hashMap = new HashMap();
        hashMap.put("group-id", this.consumer.groupMetadata().groupId());
        hashMap.put("group-instance-id", this.consumer.groupMetadata().groupInstanceId().orElse(ClientInformation.UNKNOWN_NAME_OR_VERSION));
        hashMap.put("group-generation-id", Integer.valueOf(this.consumer.groupMetadata().generationId()));
        Health up = Health.up(hashMap);
        if (up.equals(this.consumerStatus)) {
            return;
        }
        this.consumerStatus = up;
        this.context.reportHealth(Health.up(hashMap));
        LOG.info("Consumer status changed to UP, process {}, version {}, element {} ", this.context.getDefinition().bpmnProcessId(), this.context.getDefinition().version(), this.context.getDefinition().elementId());
    }

    private void reportDown(Throwable th) {
        Health down = Health.down(th);
        if (down.equals(this.consumerStatus)) {
            return;
        }
        this.consumerStatus = down;
        this.context.reportHealth(Health.down(th));
        LOG.error("Kafka Consumer status changed to DOWN, process {}, version {}, element {}", this.context.getDefinition().bpmnProcessId(), this.context.getDefinition().version(), this.context.getDefinition().elementId(), th);
    }
}
