package io.confluent.kafkarest.resources;

import io.confluent.kafkarest.AvroConsumerState;
import io.confluent.kafkarest.BinaryConsumerState;
import io.confluent.kafkarest.ConsumerManager;
import io.confluent.kafkarest.ConsumerState;
import io.confluent.kafkarest.Context;
import io.confluent.kafkarest.UriUtils;
import io.confluent.kafkarest.Versions;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.CreateConsumerInstanceResponse;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.rest.annotations.PerformanceMetric;
import java.util.List;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.UriInfo;

@Produces({Versions.KAFKA_V1_JSON_BINARY_WEIGHTED_LOW, Versions.KAFKA_V1_JSON_AVRO_WEIGHTED_LOW, Versions.KAFKA_V1_JSON_WEIGHTED, Versions.KAFKA_DEFAULT_JSON_WEIGHTED, Versions.JSON_WEIGHTED})
@Path("/consumers")
@Consumes({"application/vnd.kafka.binary.v1+json", "application/vnd.kafka.avro.v1+json", "application/vnd.kafka.v1+json", Versions.KAFKA_DEFAULT_JSON, Versions.JSON, Versions.GENERIC_REQUEST})
/* loaded from: input_file:io/confluent/kafkarest/resources/ConsumersResource.class */
public class ConsumersResource {
    private final Context ctx;

    public ConsumersResource(Context context) {
        this.ctx = context;
    }

    @Path("/{group}")
    @Valid
    @POST
    @PerformanceMetric("consumer.create")
    public CreateConsumerInstanceResponse createGroup(@javax.ws.rs.core.Context UriInfo uriInfo, @PathParam("group") String str, @Valid ConsumerInstanceConfig consumerInstanceConfig) {
        if (consumerInstanceConfig == null) {
            consumerInstanceConfig = new ConsumerInstanceConfig();
        }
        String createConsumer = this.ctx.getConsumerManager().createConsumer(str, consumerInstanceConfig);
        return new CreateConsumerInstanceResponse(createConsumer, UriUtils.absoluteUriBuilder(this.ctx.getConfig(), uriInfo).path("instances").path(createConsumer).build(new Object[0]).toString());
    }

    @POST
    @Path("/{group}/instances/{instance}/offsets")
    @PerformanceMetric("consumer.commit")
    public void commitOffsets(@Suspended final AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2) {
        this.ctx.getConsumerManager().commitOffsets(str, str2, new ConsumerManager.CommitCallback() { // from class: io.confluent.kafkarest.resources.ConsumersResource.1
            @Override // io.confluent.kafkarest.ConsumerManager.CommitCallback
            public void onCompletion(List<TopicPartitionOffset> list, Exception exc) {
                if (exc != null) {
                    asyncResponse.resume(exc);
                } else {
                    asyncResponse.resume(list);
                }
            }
        });
    }

    @Path("/{group}/instances/{instance}")
    @DELETE
    @PerformanceMetric("consumer.delete")
    public void deleteGroup(@PathParam("group") String str, @PathParam("instance") String str2) {
        this.ctx.getConsumerManager().deleteConsumer(str, str2);
    }

    @GET
    @Path("/{group}/instances/{instance}/topics/{topic}")
    @Produces({"application/vnd.kafka.binary.v1+json", Versions.KAFKA_V1_JSON_WEIGHTED, Versions.KAFKA_DEFAULT_JSON_WEIGHTED, Versions.JSON_WEIGHTED, Versions.ANYTHING})
    @PerformanceMetric("consumer.topic.read-binary")
    public void readTopicBinary(@Suspended AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2, @PathParam("topic") String str3, @QueryParam("max_bytes") @DefaultValue("-1") long j) {
        readTopic(asyncResponse, str, str2, str3, j, BinaryConsumerState.class);
    }

    @GET
    @Path("/{group}/instances/{instance}/topics/{topic}")
    @Produces({"application/vnd.kafka.avro.v1+json"})
    @PerformanceMetric("consumer.topic.read-avro")
    public void readTopicAvro(@Suspended AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2, @PathParam("topic") String str3, @QueryParam("max_bytes") @DefaultValue("-1") long j) {
        readTopic(asyncResponse, str, str2, str3, j, AvroConsumerState.class);
    }

    private <KafkaK, KafkaV, ClientK, ClientV> void readTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("group") String str, @PathParam("instance") String str2, @PathParam("topic") String str3, @QueryParam("max_bytes") @DefaultValue("-1") long j, Class<? extends ConsumerState<KafkaK, KafkaV, ClientK, ClientV>> cls) {
        this.ctx.getConsumerManager().readTopic(str, str2, str3, cls, j <= 0 ? Long.MAX_VALUE : j, new ConsumerManager.ReadCallback<ClientK, ClientV>() { // from class: io.confluent.kafkarest.resources.ConsumersResource.2
            @Override // io.confluent.kafkarest.ConsumerManager.ReadCallback
            public void onCompletion(List<? extends ConsumerRecord<ClientK, ClientV>> list, Exception exc) {
                if (exc != null) {
                    asyncResponse.resume(exc);
                } else {
                    asyncResponse.resume(list);
                }
            }
        });
    }
}
