package io.camunda.connector.kafka.outbound;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.avro.AvroMapper;
import com.fasterxml.jackson.dataformat.avro.AvroSchema;
import io.camunda.connector.api.annotation.OutboundConnector;
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.json.ConnectorsObjectMapperSupplier;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
import io.camunda.connector.generator.java.annotation.ElementTemplate;
import io.camunda.connector.kafka.model.KafkaPropertiesUtil;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorRequest;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorResponse;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.avro.Schema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

@OutboundConnector(name = "Kafka Producer", inputVariables = {"authentication", "topic", "message", "additionalProperties", "headers", "avro"}, type = "io.camunda:connector-kafka:1")
@ElementTemplate(id = "io.camunda.connectors.KAFKA.v1", name = "Kafka Outbound Connector", description = "Produce Kafka message", inputDataClass = KafkaConnectorRequest.class, version = 4, propertyGroups = {@ElementTemplate.PropertyGroup(id = "authentication", label = "Authentication"), @ElementTemplate.PropertyGroup(id = "kafka", label = "Kafka"), @ElementTemplate.PropertyGroup(id = "message", label = "Message")}, documentationRef = "https://docs.camunda.io/docs/components/connectors/out-of-the-box-connectors/kafka/?kafka=outbound", icon = "icon.svg")
/* loaded from: input_file:io/camunda/connector/kafka/outbound/KafkaConnectorFunction.class */
public class KafkaConnectorFunction implements OutboundConnectorFunction {
    private final Function<Properties, Producer<String, Object>> producerCreatorFunction;
    private static final ObjectMapper objectMapper = ConnectorsObjectMapperSupplier.getCopy().enable(new JsonParser.Feature[]{JsonParser.Feature.ALLOW_SINGLE_QUOTES});

    public KafkaConnectorFunction() {
        this(KafkaProducer::new);
    }

    public KafkaConnectorFunction(Function<Properties, Producer<String, Object>> function) {
        this.producerCreatorFunction = function;
    }

    public Object execute(OutboundConnectorContext outboundConnectorContext) {
        return executeConnector((KafkaConnectorRequest) outboundConnectorContext.bindVariables(KafkaConnectorRequest.class));
    }

    public static byte[] produceAvroMessage(KafkaConnectorRequest kafkaConnectorRequest) throws Exception {
        AvroSchema avroSchema = new AvroSchema(new Schema.Parser().setValidate(true).parse(kafkaConnectorRequest.avro().schema()));
        AvroMapper avroMapper = new AvroMapper();
        Object value = kafkaConnectorRequest.message().value();
        if (value instanceof String) {
            value = objectMapper.readTree((String) value);
        }
        return avroMapper.writer(avroSchema).writeValueAsBytes(value);
    }

    private KafkaConnectorResponse executeConnector(KafkaConnectorRequest kafkaConnectorRequest) {
        try {
            Producer<String, Object> apply = this.producerCreatorFunction.apply(KafkaPropertiesUtil.assembleKafkaClientProperties(kafkaConnectorRequest));
            try {
                Map<String, String> headers = kafkaConnectorRequest.headers() != null ? kafkaConnectorRequest.headers() : new HashMap<>();
                ProducerRecord<String, Object> createProducerRecord = createProducerRecord(kafkaConnectorRequest);
                addHeadersToProducerRecord(createProducerRecord, headers);
                KafkaConnectorResponse constructKafkaConnectorResponse = constructKafkaConnectorResponse((RecordMetadata) apply.send(createProducerRecord).get(45L, TimeUnit.SECONDS));
                if (apply != null) {
                    apply.close();
                }
                return constructKafkaConnectorResponse;
            } finally {
            }
        } catch (Exception e) {
            throw new ConnectorException("FAIL", "Error during Kafka Producer execution; error message: [" + e.getMessage() + "]", e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [byte[]] */
    private ProducerRecord<String, Object> createProducerRecord(KafkaConnectorRequest kafkaConnectorRequest) throws Exception {
        return new ProducerRecord<>(kafkaConnectorRequest.topic().topicName(), (Integer) null, (Long) null, transformData(kafkaConnectorRequest.message().key()), kafkaConnectorRequest.avro() != null ? produceAvroMessage(kafkaConnectorRequest) : transformData(kafkaConnectorRequest.message().value()));
    }

    public static String transformData(Object obj) throws JsonProcessingException {
        return obj instanceof String ? (String) obj : objectMapper.writeValueAsString(obj);
    }

    private void addHeadersToProducerRecord(ProducerRecord<String, Object> producerRecord, Map<String, String> map) {
        if (map != null) {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                producerRecord.headers().add(entry.getKey(), entry.getValue().getBytes(StandardCharsets.UTF_8));
            }
        }
    }

    private KafkaConnectorResponse constructKafkaConnectorResponse(RecordMetadata recordMetadata) {
        return new KafkaConnectorResponse(recordMetadata.topic(), recordMetadata.timestamp(), recordMetadata.offset(), recordMetadata.partition());
    }
}
