package io.confluent.kafkarest;

import com.fasterxml.jackson.databind.JsonNode;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe;
import io.confluent.kafkarest.converters.ConversionException;
import io.confluent.kafkarest.converters.SchemaConverter;
import io.confluent.kafkarest.entities.ProduceRecord;
import io.confluent.kafkarest.entities.ProduceRequest;
import io.confluent.rest.exceptions.RestException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:io/confluent/kafkarest/SchemaRestProducer.class */
public class SchemaRestProducer implements RestProducer<JsonNode, JsonNode> {
    protected final KafkaProducer<Object, Object> producer;
    protected final AbstractKafkaSchemaSerDe keySerializer;
    protected final AbstractKafkaSchemaSerDe valueSerializer;
    protected final SchemaProvider schemaProvider;
    protected final SchemaConverter schemaConverter;
    protected final Map<ParsedSchema, Integer> schemaIdCache = new ConcurrentHashMap(100);

    public SchemaRestProducer(KafkaProducer<Object, Object> kafkaProducer, AbstractKafkaSchemaSerDe abstractKafkaSchemaSerDe, AbstractKafkaSchemaSerDe abstractKafkaSchemaSerDe2, SchemaProvider schemaProvider, SchemaConverter schemaConverter) {
        this.producer = kafkaProducer;
        this.keySerializer = abstractKafkaSchemaSerDe;
        this.valueSerializer = abstractKafkaSchemaSerDe2;
        this.schemaProvider = schemaProvider;
        this.schemaConverter = schemaConverter;
    }

    @Override // io.confluent.kafkarest.RestProducer
    public void produce(ProduceTask produceTask, String str, Integer num, Collection<? extends ProduceRecord<JsonNode, JsonNode>> collection) {
        ProduceRequest<?, ?> schemaHolder = produceTask.getSchemaHolder();
        ParsedSchema parsedSchema = null;
        ParsedSchema parsedSchema2 = null;
        Integer keySchemaId = schemaHolder.getKeySchemaId();
        Integer valueSchemaId = schemaHolder.getValueSchemaId();
        try {
            if (keySchemaId != null) {
                parsedSchema = this.keySerializer.getSchemaById(keySchemaId.intValue());
            } else if (schemaHolder.getKeySchema() != null) {
                parsedSchema = (ParsedSchema) this.schemaProvider.parseSchema(schemaHolder.getKeySchema(), Collections.emptyList()).orElseThrow(() -> {
                    return Errors.invalidSchemaException(schemaHolder.getKeySchema());
                });
                if (this.schemaIdCache.containsKey(parsedSchema)) {
                    keySchemaId = this.schemaIdCache.get(parsedSchema);
                    parsedSchema = this.keySerializer.getSchemaById(keySchemaId.intValue());
                } else {
                    keySchemaId = Integer.valueOf(this.keySerializer.register(str + "-key", parsedSchema));
                    this.schemaIdCache.put(parsedSchema, keySchemaId);
                }
            }
            if (valueSchemaId != null) {
                parsedSchema2 = this.valueSerializer.getSchemaById(valueSchemaId.intValue());
            } else if (schemaHolder.getValueSchema() != null) {
                parsedSchema2 = (ParsedSchema) this.schemaProvider.parseSchema(schemaHolder.getValueSchema(), Collections.emptyList()).orElseThrow(() -> {
                    return Errors.invalidSchemaException(schemaHolder.getValueSchema());
                });
                if (this.schemaIdCache.containsKey(parsedSchema2)) {
                    valueSchemaId = this.schemaIdCache.get(parsedSchema2);
                    parsedSchema2 = this.valueSerializer.getSchemaById(valueSchemaId.intValue());
                } else {
                    valueSchemaId = Integer.valueOf(this.valueSerializer.register(str + "-value", parsedSchema2));
                    this.schemaIdCache.put(parsedSchema2, valueSchemaId);
                }
            }
            produceTask.setSchemaIds(keySchemaId, valueSchemaId);
            ArrayList arrayList = new ArrayList();
            try {
                for (ProduceRecord<JsonNode, JsonNode> produceRecord : collection) {
                    Object object = parsedSchema != null ? this.schemaConverter.toObject(produceRecord.getKey(), parsedSchema) : null;
                    Object object2 = parsedSchema2 != null ? this.schemaConverter.toObject(produceRecord.getValue(), parsedSchema2) : null;
                    Integer num2 = num;
                    if (num2 == null) {
                        num2 = produceRecord.getPartition();
                    }
                    arrayList.add(new ProducerRecord(str, num2, object, object2));
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    this.producer.send((ProducerRecord) it.next(), produceTask.createCallback());
                }
            } catch (ConversionException e) {
                throw Errors.jsonConversionException(e);
            }
        } catch (IOException e2) {
            throw new RestException("Schema registration or lookup failed", 408, 40801, e2);
        } catch (RestClientException e3) {
            throw new RestException("Schema registration or lookup failed", 408, 40801, e3);
        }
    }

    @Override // io.confluent.kafkarest.RestProducer
    public void close() {
        this.producer.close();
    }
}
