package io.confluent.kafkarest.resources;

import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.ProducerPool;
import io.confluent.kafkarest.RecordMetadataOrException;
import io.confluent.kafkarest.Utils;
import io.confluent.kafkarest.entities.AvroTopicProduceRecord;
import io.confluent.kafkarest.entities.BinaryTopicProduceRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.JsonTopicProduceRecord;
import io.confluent.kafkarest.entities.PartitionOffset;
import io.confluent.kafkarest.entities.ProduceResponse;
import io.confluent.kafkarest.entities.Topic;
import io.confluent.kafkarest.entities.TopicProduceRecord;
import io.confluent.kafkarest.entities.TopicProduceRequest;
import io.confluent.rest.annotations.PerformanceMetric;
import java.util.Collection;
import java.util.List;
import java.util.Vector;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/vnd.kafka.v1+json; qs=0.9", "application/vnd.kafka+json; qs=0.8", "application/json; qs=0.5", "application/vnd.kafka.v2+json; qs=0.9"})
@Path("/topics")
@Consumes({"application/vnd.kafka.v1+json", "application/vnd.kafka+json", "application/json", "application/octet-stream", "application/vnd.kafka.v2+json"})
/* loaded from: input_file:io/confluent/kafkarest/resources/TopicsResource.class */
public class TopicsResource {
    private static final Logger log = LoggerFactory.getLogger(TopicsResource.class);
    private final KafkaRestContext ctx;

    public TopicsResource(KafkaRestContext kafkaRestContext) {
        this.ctx = kafkaRestContext;
    }

    @GET
    @PerformanceMetric("topics.list")
    public Collection<String> list() throws Exception {
        return this.ctx.getAdminClientWrapper().getTopicNames();
    }

    @GET
    @Path("/{topic}")
    @PerformanceMetric("topic.get")
    public Topic getTopic(@PathParam("topic") String str) throws Exception {
        Topic topic = this.ctx.getAdminClientWrapper().getTopic(str);
        if (topic == null) {
            throw Errors.topicNotFoundException();
        }
        return topic;
    }

    @Path("/{topic}")
    @Consumes({"application/vnd.kafka.binary.v1+json", "application/vnd.kafka.v1+json", "application/vnd.kafka+json", "application/json", "application/octet-stream", "application/vnd.kafka.binary.v2+json", "application/vnd.kafka.v2+json"})
    @POST
    @PerformanceMetric("topic.produce-binary")
    public void produceBinary(@Suspended AsyncResponse asyncResponse, @PathParam("topic") String str, @NotNull @Valid TopicProduceRequest<BinaryTopicProduceRecord> topicProduceRequest) {
        produce(asyncResponse, str, EmbeddedFormat.BINARY, topicProduceRequest);
    }

    @Path("/{topic}")
    @Consumes({"application/vnd.kafka.json.v1+json", "application/vnd.kafka.json.v2+json"})
    @POST
    @PerformanceMetric("topic.produce-json")
    public void produceJson(@Suspended AsyncResponse asyncResponse, @PathParam("topic") String str, @NotNull @Valid TopicProduceRequest<JsonTopicProduceRecord> topicProduceRequest) {
        produce(asyncResponse, str, EmbeddedFormat.JSON, topicProduceRequest);
    }

    @Path("/{topic}")
    @Consumes({"application/vnd.kafka.avro.v1+json", "application/vnd.kafka.avro.v2+json"})
    @POST
    @PerformanceMetric("topic.produce-avro")
    public void produceAvro(@Suspended AsyncResponse asyncResponse, @PathParam("topic") String str, @NotNull @Valid TopicProduceRequest<AvroTopicProduceRecord> topicProduceRequest) {
        boolean z = false;
        boolean z2 = false;
        for (AvroTopicProduceRecord avroTopicProduceRecord : topicProduceRequest.getRecords()) {
            z = z || !avroTopicProduceRecord.getJsonKey().isNull();
            z2 = z2 || !avroTopicProduceRecord.getJsonValue().isNull();
        }
        if (z && topicProduceRequest.getKeySchema() == null && topicProduceRequest.getKeySchemaId() == null) {
            throw Errors.keySchemaMissingException();
        }
        if (z2 && topicProduceRequest.getValueSchema() == null && topicProduceRequest.getValueSchemaId() == null) {
            throw Errors.valueSchemaMissingException();
        }
        produce(asyncResponse, str, EmbeddedFormat.AVRO, topicProduceRequest);
    }

    public <K, V, R extends TopicProduceRecord<K, V>> void produce(final AsyncResponse asyncResponse, String str, EmbeddedFormat embeddedFormat, TopicProduceRequest<R> topicProduceRequest) {
        log.trace("Executing topic produce request id={} topic={} format={} request={}", new Object[]{asyncResponse, str, embeddedFormat, topicProduceRequest});
        this.ctx.getProducerPool().produce(str, null, embeddedFormat, topicProduceRequest, topicProduceRequest.getRecords(), new ProducerPool.ProduceRequestCallback() { // from class: io.confluent.kafkarest.resources.TopicsResource.1
            @Override // io.confluent.kafkarest.ProducerPool.ProduceRequestCallback
            public void onCompletion(Integer num, Integer num2, List<RecordMetadataOrException> list) {
                ProduceResponse produceResponse = new ProduceResponse();
                Vector vector = new Vector();
                for (RecordMetadataOrException recordMetadataOrException : list) {
                    if (recordMetadataOrException.getException() != null) {
                        int errorCodeFromProducerException = Utils.errorCodeFromProducerException(recordMetadataOrException.getException());
                        vector.add(new PartitionOffset(null, null, Integer.valueOf(errorCodeFromProducerException), recordMetadataOrException.getException().getMessage()));
                    } else {
                        vector.add(new PartitionOffset(Integer.valueOf(recordMetadataOrException.getRecordMetadata().partition()), Long.valueOf(recordMetadataOrException.getRecordMetadata().offset()), null, null));
                    }
                }
                produceResponse.setOffsets(vector);
                produceResponse.setKeySchemaId(num);
                produceResponse.setValueSchemaId(num2);
                TopicsResource.log.trace("Completed topic produce request id={} response={}", asyncResponse, produceResponse);
                asyncResponse.resume(Response.status(Utils.produceRequestStatus(produceResponse)).entity(produceResponse).build());
            }
        });
    }
}
