package datahub.shaded.io.confluent.kafka.schemaregistry.rules;

import datahub.shaded.io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import datahub.shaded.io.confluent.kafka.schemaregistry.utils.JacksonMapper;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import datahub.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import datahub.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import datahub.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import datahub.shaded.org.apache.kafka.common.errors.SerializationException;
import datahub.shaded.org.apache.kafka.common.header.Headers;
import datahub.shaded.org.apache.kafka.common.serialization.DoubleSerializer;
import datahub.shaded.org.apache.kafka.common.serialization.FloatSerializer;
import datahub.shaded.org.apache.kafka.common.serialization.IntegerSerializer;
import datahub.shaded.org.apache.kafka.common.serialization.LongSerializer;
import datahub.shaded.org.apache.kafka.common.serialization.ShortSerializer;
import datahub.shaded.org.apache.kafka.common.utils.Bytes;
import datahub.shaded.org.slf4j.Logger;
import datahub.shaded.org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

/* loaded from: input_file:datahub/shaded/io/confluent/kafka/schemaregistry/rules/DlqAction.class */
public class DlqAction implements RuleAction {
    public static final String TYPE = "DLQ";
    public static final String DLQ_TOPIC = "dlq.topic";
    public static final String DLQ_AUTO_FLUSH = "dlq.auto.flush";
    public static final String PRODUCER = "producer";
    public static final String HEADER_PREFIX = "__rule.";
    public static final String RULE_NAME = "__rule.name";
    public static final String RULE_MODE = "__rule.mode";
    public static final String RULE_SUBJECT = "__rule.subject";
    public static final String RULE_TOPIC = "__rule.topic";
    public static final String RULE_EXCEPTION = "__rule.exception";
    private Map<String, ?> configs;
    private String topic;
    private boolean autoFlush;
    private volatile KafkaProducer<byte[], byte[]> producer;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) DlqAction.class);
    private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
    private static final IntegerSerializer INT_SERIALIZER = new IntegerSerializer();
    private static final ShortSerializer SHORT_SERIALIZER = new ShortSerializer();
    private static final DoubleSerializer DOUBLE_SERIALIZER = new DoubleSerializer();
    private static final FloatSerializer FLOAT_SERIALIZER = new FloatSerializer();

    @Override // datahub.shaded.io.confluent.kafka.schemaregistry.rules.RuleBase
    public boolean addOriginalConfigs() {
        return true;
    }

    @Override // datahub.shaded.io.confluent.kafka.schemaregistry.rules.RuleBase, datahub.shaded.org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        this.configs = map;
        this.topic = (String) map.get(DLQ_TOPIC);
        Object obj = map.get(DLQ_AUTO_FLUSH);
        if (obj != null) {
            this.autoFlush = Boolean.parseBoolean(obj.toString());
        }
        this.producer = (KafkaProducer) map.get("producer");
    }

    @Override // datahub.shaded.io.confluent.kafka.schemaregistry.rules.RuleBase
    public String type() {
        return TYPE;
    }

    public String topic() {
        return this.topic;
    }

    private KafkaProducer<byte[], byte[]> producer() {
        if (this.producer == null) {
            Map<String, Object> baseProducerConfigs = baseProducerConfigs();
            baseProducerConfigs.putAll(this.configs);
            synchronized (this) {
                if (this.producer == null) {
                    this.producer = new KafkaProducer<>(baseProducerConfigs);
                }
            }
        }
        return this.producer;
    }

    @Override // datahub.shaded.io.confluent.kafka.schemaregistry.rules.RuleAction
    public void run(RuleContext ruleContext, Object obj, RuleException ruleException) throws RuleException {
        String str = topic();
        if (str == null || str.isEmpty()) {
            str = ruleContext.getParameter(DLQ_TOPIC);
        }
        if (str == null || str.isEmpty()) {
            throw new SerializationException("Could not send to DLQ as no topic is configured");
        }
        String str2 = str;
        try {
            ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(str2, (Integer) null, convertToBytes(ruleContext, ruleContext.originalKey()), convertToBytes(ruleContext, ruleContext.originalValue()), ruleContext.headers());
            populateHeaders(ruleContext, producerRecord, ruleException);
            producer().send(producerRecord, (recordMetadata, exc) -> {
                if (exc != null) {
                    log.error("Could not produce message to DLQ topic {}", str2, exc);
                } else {
                    log.info("Sent message to DLQ topic {}", str2);
                }
            });
            if (this.autoFlush) {
                this.producer.flush();
            }
        } catch (Exception e) {
            log.error("Could not produce message to DLQ topic {}", str2, e);
        }
        String str3 = "Rule failed: " + ruleContext.rule().getName();
        if (ruleException == null) {
            throw new SerializationException(str3);
        }
    }

    private byte[] convertToBytes(RuleContext ruleContext, Object obj) throws IOException {
        if (obj == null) {
            return null;
        }
        if (obj instanceof byte[]) {
            return (byte[]) obj;
        }
        if (!(obj instanceof ByteBuffer)) {
            return obj instanceof Bytes ? ((Bytes) obj).get() : ((obj instanceof String) || (obj instanceof UUID)) ? obj.toString().getBytes(StandardCharsets.UTF_8) : obj instanceof Long ? LONG_SERIALIZER.serialize(ruleContext.topic(), (Long) obj) : obj instanceof Integer ? INT_SERIALIZER.serialize(ruleContext.topic(), (Integer) obj) : obj instanceof Short ? SHORT_SERIALIZER.serialize(ruleContext.topic(), (Short) obj) : obj instanceof Double ? DOUBLE_SERIALIZER.serialize(ruleContext.topic(), (Double) obj) : obj instanceof Float ? FLOAT_SERIALIZER.serialize(ruleContext.topic(), (Float) obj) : convertToJsonBytes(ruleContext, obj);
        }
        ByteBuffer byteBuffer = (ByteBuffer) obj;
        byte[] bArr = new byte[byteBuffer.remaining()];
        byteBuffer.get(bArr);
        return bArr;
    }

    private byte[] convertToJsonBytes(RuleContext ruleContext, Object obj) throws IOException {
        return JacksonMapper.INSTANCE.writeValueAsBytes(ruleContext.target().toJson(obj));
    }

    private void populateHeaders(RuleContext ruleContext, ProducerRecord<byte[], byte[]> producerRecord, RuleException ruleException) {
        Headers headers = producerRecord.headers();
        headers.add(RULE_NAME, toBytes(ruleContext.rule().getName()));
        headers.add(RULE_MODE, toBytes(ruleContext.ruleMode().name()));
        headers.add(RULE_SUBJECT, toBytes(ruleContext.subject()));
        headers.add(RULE_TOPIC, toBytes(ruleContext.topic()));
        if (ruleException != null) {
            headers.add(RULE_EXCEPTION, toBytes(ruleException.getMessage()));
        }
    }

    private byte[] toBytes(String str) {
        if (str != null) {
            return str.getBytes(StandardCharsets.UTF_8);
        }
        return null;
    }

    static Map<String, Object> baseProducerConfigs() {
        HashMap hashMap = new HashMap();
        hashMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "datahub.shaded.org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "datahub.shaded.org.apache.kafka.common.serialization.ByteArraySerializer");
        hashMap.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.toString(NetworkClientDelegate.PollResult.WAIT_FOREVER));
        hashMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "false");
        hashMap.put(ProducerConfig.ACKS_CONFIG, "all");
        hashMap.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, Schema.VERSION_EXAMPLE);
        hashMap.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));
        return hashMap;
    }

    @Override // datahub.shaded.io.confluent.kafka.schemaregistry.rules.RuleBase, java.lang.AutoCloseable
    public void close() {
        if (this.producer != null) {
            this.producer.close();
        }
    }
}
