package io.confluent.kafkarest.resources;

import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.ProducerPool;
import io.confluent.kafkarest.RecordMetadataOrException;
import io.confluent.kafkarest.Utils;
import io.confluent.kafkarest.entities.AvroProduceRecord;
import io.confluent.kafkarest.entities.BinaryProduceRecord;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.JsonProduceRecord;
import io.confluent.kafkarest.entities.Partition;
import io.confluent.kafkarest.entities.PartitionOffset;
import io.confluent.kafkarest.entities.PartitionProduceRequest;
import io.confluent.kafkarest.entities.ProduceRecord;
import io.confluent.kafkarest.entities.ProduceResponse;
import io.confluent.rest.annotations.PerformanceMetric;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Vector;
import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.BadRequestException;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Produces({"application/vnd.kafka.binary.v1+json; qs=0.1", "application/vnd.kafka.avro.v1+json; qs=0.1", "application/vnd.kafka.v1+json; qs=0.9", "application/vnd.kafka+json; qs=0.8", "application/json; qs=0.5"})
@Path("/topics/{topic}/partitions")
@Consumes({"application/vnd.kafka.v1+json", "application/vnd.kafka+json", "application/json", "application/octet-stream"})
/* loaded from: input_file:io/confluent/kafkarest/resources/PartitionsResource.class */
public class PartitionsResource {
    private static final Logger log = LoggerFactory.getLogger(PartitionsResource.class);
    private final KafkaRestContext ctx;

    public PartitionsResource(KafkaRestContext kafkaRestContext) {
        this.ctx = kafkaRestContext;
    }

    @GET
    @PerformanceMetric("partitions.list")
    public List<Partition> list(@PathParam("topic") String str) throws Exception {
        checkTopicExists(str);
        return this.ctx.getAdminClientWrapper().getTopicPartitions(str);
    }

    @GET
    @Path("/{partition}")
    @PerformanceMetric("partition.get")
    public Partition getPartition(@PathParam("topic") String str, @PathParam("partition") int i) throws Exception {
        checkTopicExists(str);
        Partition topicPartition = this.ctx.getAdminClientWrapper().getTopicPartition(str, i);
        if (topicPartition == null) {
            throw Errors.partitionNotFoundException();
        }
        return topicPartition;
    }

    @GET
    @Path("/{partition}/messages")
    @Produces({"application/vnd.kafka.binary.v1+json", "application/vnd.kafka.v1+json; qs=0.9", "application/vnd.kafka+json; qs=0.8", "application/json; qs=0.5"})
    @PerformanceMetric("partition.consume-binary")
    public void consumeBinary(@Suspended AsyncResponse asyncResponse, @PathParam("topic") String str, @PathParam("partition") int i, @QueryParam("offset") @Nullable Long l, @QueryParam("timestamp") @Nullable Instant instant, @QueryParam("count") @DefaultValue("1") long j) {
        if ((l != null) == (instant != null)) {
            throw new BadRequestException("Either `offset` or `timestamp` query parameters must be set.");
        }
        if (l != null) {
            consume(asyncResponse, str, i, l.longValue(), j, EmbeddedFormat.BINARY);
        } else {
            consume(asyncResponse, str, i, instant, j, EmbeddedFormat.BINARY);
        }
    }

    @GET
    @Path("/{partition}/messages")
    @Produces({"application/vnd.kafka.avro.v1+json; qs=0.1"})
    @PerformanceMetric("partition.consume-avro")
    public void consumeAvro(@Suspended AsyncResponse asyncResponse, @PathParam("topic") String str, @PathParam("partition") int i, @QueryParam("offset") @Nullable Long l, @QueryParam("timestamp") @Nullable Instant instant, @QueryParam("count") @DefaultValue("1") long j) {
        if ((l != null) == (instant != null)) {
            throw new BadRequestException("Either `offset` or `timestamp` query parameters must be set.");
        }
        if (l != null) {
            consume(asyncResponse, str, i, l.longValue(), j, EmbeddedFormat.AVRO);
        } else {
            consume(asyncResponse, str, i, instant, j, EmbeddedFormat.AVRO);
        }
    }

    @GET
    @Path("/{partition}/messages")
    @Produces({"application/vnd.kafka.json.v1+json; qs=0.1"})
    @PerformanceMetric("partition.consume-json")
    public void consumeJson(@Suspended AsyncResponse asyncResponse, @PathParam("topic") String str, @PathParam("partition") int i, @QueryParam("offset") @Nullable Long l, @QueryParam("timestamp") @Nullable Instant instant, @QueryParam("count") @DefaultValue("1") long j) {
        if ((l != null) == (instant != null)) {
            throw new BadRequestException("Either `offset` or `timestamp` query parameters must be set.");
        }
        if (l != null) {
            consume(asyncResponse, str, i, l.longValue(), j, EmbeddedFormat.JSON);
        } else {
            consume(asyncResponse, str, i, instant, j, EmbeddedFormat.JSON);
        }
    }

    @Path("/{partition}")
    @Consumes({"application/vnd.kafka.binary.v1+json", "application/vnd.kafka.v1+json", "application/vnd.kafka+json", "application/json", "application/octet-stream"})
    @POST
    @PerformanceMetric("partition.produce-binary")
    public void produceBinary(@Suspended AsyncResponse asyncResponse, @PathParam("topic") String str, @PathParam("partition") int i, @NotNull @Valid PartitionProduceRequest<BinaryProduceRecord> partitionProduceRequest) {
        produce(asyncResponse, str, i, EmbeddedFormat.BINARY, partitionProduceRequest);
    }

    @Path("/{partition}")
    @Consumes({"application/vnd.kafka.json.v1+json"})
    @POST
    @PerformanceMetric("partition.produce-json")
    public void produceJson(@Suspended AsyncResponse asyncResponse, @PathParam("topic") String str, @PathParam("partition") int i, @NotNull @Valid PartitionProduceRequest<JsonProduceRecord> partitionProduceRequest) {
        produce(asyncResponse, str, i, EmbeddedFormat.JSON, partitionProduceRequest);
    }

    @Path("/{partition}")
    @Consumes({"application/vnd.kafka.avro.v1+json"})
    @POST
    @PerformanceMetric("partition.produce-avro")
    public void produceAvro(@Suspended AsyncResponse asyncResponse, @PathParam("topic") String str, @PathParam("partition") int i, @NotNull @Valid PartitionProduceRequest<AvroProduceRecord> partitionProduceRequest) {
        boolean z = false;
        boolean z2 = false;
        for (AvroProduceRecord avroProduceRecord : partitionProduceRequest.getRecords()) {
            z = z || !avroProduceRecord.getJsonKey().isNull();
            z2 = z2 || !avroProduceRecord.getJsonValue().isNull();
        }
        if (z && partitionProduceRequest.getKeySchema() == null && partitionProduceRequest.getKeySchemaId() == null) {
            throw Errors.keySchemaMissingException();
        }
        if (z2 && partitionProduceRequest.getValueSchema() == null && partitionProduceRequest.getValueSchemaId() == null) {
            throw Errors.valueSchemaMissingException();
        }
        produce(asyncResponse, str, i, EmbeddedFormat.AVRO, partitionProduceRequest);
    }

    private void consume(@Suspended AsyncResponse asyncResponse, String str, int i, Instant instant, long j, EmbeddedFormat embeddedFormat) {
        Optional<Long> offsetForTime = this.ctx.getKafkaConsumerManager().getOffsetForTime(str, i, instant);
        if (offsetForTime.isPresent()) {
            consume(asyncResponse, str, i, offsetForTime.get().longValue(), j, embeddedFormat);
        } else {
            asyncResponse.resume(Collections.emptyList());
        }
    }

    private <K, V> void consume(@Suspended final AsyncResponse asyncResponse, String str, int i, long j, long j2, EmbeddedFormat embeddedFormat) {
        log.trace("Executing simple consume id={} topic={} partition={} offset={} count={}", new Object[]{asyncResponse, str, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(j2)});
        this.ctx.getSimpleConsumerManager().consume(str, i, j, j2, embeddedFormat, new ConsumerReadCallback<K, V>() { // from class: io.confluent.kafkarest.resources.PartitionsResource.1
            public void onCompletion(List<? extends ConsumerRecord<K, V>> list, Exception exc) {
                PartitionsResource.log.trace("Completed simple consume id={} records={} exception={}", new Object[]{asyncResponse, list, exc});
                if (exc != null) {
                    asyncResponse.resume(exc);
                } else {
                    asyncResponse.resume(list);
                }
            }
        });
    }

    protected <K, V, R extends ProduceRecord<K, V>> void produce(final AsyncResponse asyncResponse, String str, int i, EmbeddedFormat embeddedFormat, PartitionProduceRequest<R> partitionProduceRequest) {
        log.trace("Executing topic produce request id={} topic={} partition={} format={} request={}", new Object[]{asyncResponse, str, Integer.valueOf(i), embeddedFormat, partitionProduceRequest});
        this.ctx.getProducerPool().produce(str, Integer.valueOf(i), embeddedFormat, partitionProduceRequest, partitionProduceRequest.getRecords(), new ProducerPool.ProduceRequestCallback() { // from class: io.confluent.kafkarest.resources.PartitionsResource.2
            @Override // io.confluent.kafkarest.ProducerPool.ProduceRequestCallback
            public void onCompletion(Integer num, Integer num2, List<RecordMetadataOrException> list) {
                ProduceResponse produceResponse = new ProduceResponse();
                Vector vector = new Vector();
                for (RecordMetadataOrException recordMetadataOrException : list) {
                    if (recordMetadataOrException.getException() != null) {
                        int errorCodeFromProducerException = Utils.errorCodeFromProducerException(recordMetadataOrException.getException());
                        vector.add(new PartitionOffset(null, null, Integer.valueOf(errorCodeFromProducerException), recordMetadataOrException.getException().getMessage()));
                    } else {
                        vector.add(new PartitionOffset(Integer.valueOf(recordMetadataOrException.getRecordMetadata().partition()), Long.valueOf(recordMetadataOrException.getRecordMetadata().offset()), null, null));
                    }
                }
                produceResponse.setOffsets(vector);
                produceResponse.setKeySchemaId(num);
                produceResponse.setValueSchemaId(num2);
                PartitionsResource.log.trace("Completed topic produce request id={} response={}", asyncResponse, produceResponse);
                asyncResponse.resume(Response.status(Utils.produceRequestStatus(produceResponse)).entity(produceResponse).build());
            }
        });
    }

    private boolean topicExists(String str) throws Exception {
        return this.ctx.getAdminClientWrapper().topicExists(str);
    }

    private void checkTopicExists(String str) throws Exception {
        if (!topicExists(str)) {
            throw Errors.topicNotFoundException();
        }
    }
}
