package io.confluent.mqtt.stream.netty;

import io.confluent.mqtt.MqttConfig;
import io.confluent.mqtt.stream.DeliveryGuarantee;
import io.confluent.mqtt.stream.KafkaPublisher;
import io.confluent.mqtt.stream.PublishMqttRecord;
import io.confluent.mqtt.stream.StreamConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.nio.charset.StandardCharsets;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.header.Headers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ChannelHandler.Sharable
/* loaded from: input_file:io/confluent/mqtt/stream/netty/NettyKafkaPublisher.class */
public class NettyKafkaPublisher extends ChannelOutboundHandlerAdapter implements KafkaPublisher {
    private static final Logger log = LoggerFactory.getLogger(NettyKafkaPublisher.class);
    private static NettyKafkaPublisher instance;
    private final Map<DeliveryGuarantee, Producer<String, byte[]>> producers;

    protected NettyKafkaPublisher(Map<DeliveryGuarantee, Producer<String, byte[]>> map) {
        this.producers = map;
    }

    public static synchronized NettyKafkaPublisher instance(MqttConfig mqttConfig) {
        if (instance == null) {
            instance = new NettyKafkaPublisher(newProducers(mqttConfig));
        }
        return instance;
    }

    private static Map<DeliveryGuarantee, Producer<String, byte[]>> newProducers(MqttConfig mqttConfig) {
        EnumMap enumMap = new EnumMap(DeliveryGuarantee.class);
        Map<String, Object> producerProperties = producerProperties(mqttConfig);
        producerProperties.put("acks", "0");
        enumMap.put((EnumMap) DeliveryGuarantee.AT_MOST_ONCE, (DeliveryGuarantee) new KafkaProducer(producerProperties));
        Map<String, Object> producerProperties2 = producerProperties(mqttConfig);
        producerProperties2.put("acks", "all");
        enumMap.put((EnumMap) DeliveryGuarantee.AT_LEAST_ONCE, (DeliveryGuarantee) new KafkaProducer(producerProperties2));
        Map<String, Object> producerProperties3 = producerProperties(mqttConfig);
        producerProperties3.put("acks", "all");
        producerProperties3.put("enable.idempotence", "true");
        enumMap.put((EnumMap) DeliveryGuarantee.EXACTLY_ONCE, (DeliveryGuarantee) new KafkaProducer(producerProperties3));
        return enumMap;
    }

    private static Map<String, Object> producerProperties(MqttConfig mqttConfig) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", mqttConfig.bootstrapServers());
        hashMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        hashMap.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.putAll(mqttConfig.originalsWithPrefix(StreamConfig.PRODUCER_PREFIX));
        return hashMap;
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
        if (!(obj instanceof PublishMqttRecord)) {
            channelHandlerContext.write(obj);
            return;
        }
        String asShortText = channelHandlerContext.channel().id().asShortText();
        PublishMqttRecord publishMqttRecord = (PublishMqttRecord) obj;
        log.debug("Publishing MQTT message to Kafka with QoS level '{}'", publishMqttRecord.deliveryGuarantee());
        log.trace("Channel {} publishing message to Kafka {} ", asShortText, publishMqttRecord);
        try {
            publish(publishMqttRecord, (recordMetadata, exc) -> {
                if (exc != null) {
                    channelPromise.setFailure(exc);
                    log.error("Channel {} publishing to Kafka failed for {}", new Object[]{asShortText, obj, exc});
                } else {
                    channelPromise.setSuccess();
                    log.trace("Success in channel {} publishing to topic '{}', partition '{}', offset '{}'", new Object[]{asShortText, recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
                }
            });
        } catch (Exception e) {
            log.error("Channel {} publishing to Kafka failed with error {}", asShortText, e);
            throw e;
        }
    }

    private void publish(PublishMqttRecord publishMqttRecord, Callback callback) {
        DeliveryGuarantee deliveryGuarantee = publishMqttRecord.deliveryGuarantee();
        Producer<String, byte[]> producer = this.producers.get(publishMqttRecord.deliveryGuarantee());
        if (producer == null) {
            callback.onCompletion((RecordMetadata) null, new RuntimeException("Delivery guarantee " + deliveryGuarantee + "is not supported"));
        } else {
            producer.send(new ProducerRecord(publishMqttRecord.topic(), (Integer) null, (Long) null, publishMqttRecord.m16key(), publishMqttRecord.m15value(), convertToRecordHeaders(publishMqttRecord)), callback);
        }
    }

    private static RecordHeaders convertToRecordHeaders(PublishMqttRecord publishMqttRecord) {
        Headers<Header> headers = publishMqttRecord.headers();
        RecordHeaders recordHeaders = new RecordHeaders();
        if (headers != null) {
            for (Header header : headers) {
                recordHeaders.add(header.key(), Values.convertToString(header.schema(), header.value()).getBytes(StandardCharsets.UTF_8));
            }
        }
        return recordHeaders;
    }
}
