package io.camunda.connector.kafka.outbound;

import io.camunda.connector.api.annotation.OutboundConnector;
import io.camunda.connector.api.error.ConnectorException;
import io.camunda.connector.api.outbound.OutboundConnectorContext;
import io.camunda.connector.api.outbound.OutboundConnectorFunction;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorRequest;
import io.camunda.connector.kafka.outbound.model.KafkaConnectorResponse;
import java.util.Properties;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@OutboundConnector(name = "KAFKA", inputVariables = {"authentication", "topic", "message", "additionalProperties"}, type = "io.camunda:connector-kafka:1")
/* loaded from: input_file:io/camunda/connector/kafka/outbound/KafkaConnectorFunction.class */
public class KafkaConnectorFunction implements OutboundConnectorFunction {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaConnectorFunction.class);
    private final Function<Properties, Producer> producerCreatorFunction;

    public KafkaConnectorFunction() {
        this(KafkaProducer::new);
    }

    public KafkaConnectorFunction(Function<Properties, Producer> function) {
        this.producerCreatorFunction = function;
    }

    @Override // io.camunda.connector.api.outbound.OutboundConnectorFunction
    public Object execute(OutboundConnectorContext outboundConnectorContext) throws Exception {
        LOGGER.debug("Executing Kafka connector with context: " + outboundConnectorContext);
        KafkaConnectorRequest kafkaConnectorRequest = (KafkaConnectorRequest) outboundConnectorContext.getVariablesAsType(KafkaConnectorRequest.class);
        outboundConnectorContext.validate(kafkaConnectorRequest);
        outboundConnectorContext.replaceSecrets(kafkaConnectorRequest);
        return executeConnector(kafkaConnectorRequest);
    }

    private KafkaConnectorResponse executeConnector(KafkaConnectorRequest kafkaConnectorRequest) {
        Producer apply = this.producerCreatorFunction.apply(kafkaConnectorRequest.assembleKafkaClientProperties());
        try {
            try {
                Future<RecordMetadata> send = apply.send(new ProducerRecord(kafkaConnectorRequest.getTopic().getTopicName(), kafkaConnectorRequest.getMessage().getKey().toString(), kafkaConnectorRequest.getMessage().getValue().toString()));
                KafkaConnectorResponse kafkaConnectorResponse = new KafkaConnectorResponse();
                RecordMetadata recordMetadata = send.get(45L, TimeUnit.SECONDS);
                kafkaConnectorResponse.setTopic(recordMetadata.topic());
                kafkaConnectorResponse.setPartition(recordMetadata.partition());
                kafkaConnectorResponse.setOffset(recordMetadata.offset());
                kafkaConnectorResponse.setTimestamp(recordMetadata.timestamp());
                apply.close();
                return kafkaConnectorResponse;
            } catch (Exception e) {
                throw new ConnectorException("FAIL", "Kafka Producer execution exception", e);
            }
        } catch (Throwable th) {
            apply.close();
            throw th;
        }
    }
}
