package io.camunda.connector.kafka.outbound;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import io.camunda.connector.api.annotation.OutboundConnector;
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.json.ConnectorsObjectMapperSupplier;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorRequest;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorResponse;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

@OutboundConnector(name = "Kafka Producer", inputVariables = {"authentication", "topic", "message", "additionalProperties", "headers", "avro"}, type = "io.camunda:connector-kafka:1")
/* loaded from: input_file:io/camunda/connector/kafka/outbound/KafkaConnectorFunction.class */
public class KafkaConnectorFunction implements OutboundConnectorFunction {
    private final Function<Properties, Producer> producerCreatorFunction;
    private static final ObjectMapper objectMapper = ConnectorsObjectMapperSupplier.getCopy().enable(new JsonParser.Feature[]{JsonParser.Feature.ALLOW_SINGLE_QUOTES});

    public KafkaConnectorFunction() {
        this(KafkaProducer::new);
    }

    public KafkaConnectorFunction(Function<Properties, Producer> function) {
        this.producerCreatorFunction = function;
    }

    public Object execute(OutboundConnectorContext outboundConnectorContext) {
        return executeConnector((KafkaConnectorRequest) outboundConnectorContext.bindVariables(KafkaConnectorRequest.class));
    }

    public static byte[] produceAvroMessage(KafkaConnectorRequest kafkaConnectorRequest) throws Exception {
        AvroSchema avroSchema = new AvroSchema(new Schema.Parser().setValidate(true).parse(StringEscapeUtils.unescapeJson(kafkaConnectorRequest.getAvro().schema())));
        AvroMapper avroMapper = new AvroMapper();
        Object value = kafkaConnectorRequest.getMessage().getValue();
        if (value instanceof String) {
            value = objectMapper.readTree(StringEscapeUtils.unescapeJson((String) value));
        }
        return avroMapper.writer(avroSchema).writeValueAsBytes(value);
    }

    private KafkaConnectorResponse executeConnector(KafkaConnectorRequest kafkaConnectorRequest) {
        Producer apply = this.producerCreatorFunction.apply(kafkaConnectorRequest.assembleKafkaClientProperties());
        try {
            try {
                ProducerRecord<String, Object> createProducerRecord = createProducerRecord(kafkaConnectorRequest);
                addHeadersToProducerRecord(createProducerRecord, kafkaConnectorRequest.getHeaders());
                KafkaConnectorResponse constructKafkaConnectorResponse = constructKafkaConnectorResponse((RecordMetadata) apply.send(createProducerRecord).get(45L, TimeUnit.SECONDS));
                apply.close();
                return constructKafkaConnectorResponse;
            } catch (Exception e) {
                throw new ConnectorException("FAIL", "Kafka Producer execution exception", e);
            }
        } catch (Throwable th) {
            apply.close();
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v13, types: [byte[]] */
    private ProducerRecord<String, Object> createProducerRecord(KafkaConnectorRequest kafkaConnectorRequest) throws Exception {
        return new ProducerRecord<>(kafkaConnectorRequest.getTopic().getTopicName(), (Integer) null, (Long) null, transformData(kafkaConnectorRequest.getMessage().getKey()), kafkaConnectorRequest.getAvro() != null ? produceAvroMessage(kafkaConnectorRequest) : transformData(kafkaConnectorRequest.getMessage().getValue()));
    }

    public static String transformData(Object obj) throws JsonProcessingException {
        return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);
    }

    private void addHeadersToProducerRecord(ProducerRecord<String, Object> producerRecord, Map<String, String> map) {
        if (map != null) {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                producerRecord.headers().add(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8));
            }
        }
    }

    private KafkaConnectorResponse constructKafkaConnectorResponse(RecordMetadata recordMetadata) {
        KafkaConnectorResponse kafkaConnectorResponse = new KafkaConnectorResponse();
        kafkaConnectorResponse.setTopic(recordMetadata.topic());
        kafkaConnectorResponse.setPartition(recordMetadata.partition());
        kafkaConnectorResponse.setOffset(recordMetadata.offset());
        kafkaConnectorResponse.setTimestamp(recordMetadata.timestamp());
        return kafkaConnectorResponse;
    }
}
