package io.confluent.kafkarest.v2;

import io.confluent.kafkarest.ConsumerInstanceId;
import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestConfig;
import io.confluent.kafkarest.RestConfigUtils;
import io.confluent.kafkarest.Time;
import io.confluent.kafkarest.converters.AvroConverter;
import io.confluent.kafkarest.converters.JsonSchemaConverter;
import io.confluent.kafkarest.converters.ProtobufConverter;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.entities.v2.ConsumerAssignmentRequest;
import io.confluent.kafkarest.entities.v2.ConsumerAssignmentResponse;
import io.confluent.kafkarest.entities.v2.ConsumerCommittedRequest;
import io.confluent.kafkarest.entities.v2.ConsumerCommittedResponse;
import io.confluent.kafkarest.entities.v2.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSeekToOffsetRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSeekToRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionResponse;
import io.confluent.rest.exceptions.RestNotFoundException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import javax.ws.rs.core.Response;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManager.class */
public class KafkaConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerManager.class);
    private final KafkaRestConfig config;
    private final Time time;
    private final String bootstrapServers;
    private final Map<ConsumerInstanceId, KafkaConsumerState> consumers;
    private final ExecutorService executor;
    private KafkaConsumerFactory consumerFactory;
    final DelayQueue<RunnableReadTask> delayedReadTasks;
    private final ExpirationThread expirationThread;
    private ReadTaskSchedulerThread readTaskSchedulerThread;

    @GuardedBy("this")
    private ConsumerInstanceId adminConsumerInstanceId;

    /* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManager$CommitCallback.class */
    public interface CommitCallback {
        void onCompletion(List<TopicPartitionOffset> list, Exception exc);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManager$ExpirationThread.class */
    public class ExpirationThread extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        public ExpirationThread() {
            super("Consumer Expiration Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.isRunning.get()) {
                try {
                    synchronized (KafkaConsumerManager.this) {
                        long milliseconds = KafkaConsumerManager.this.time.milliseconds();
                        Iterator it = KafkaConsumerManager.this.consumers.values().iterator();
                        while (it.hasNext()) {
                            final KafkaConsumerState kafkaConsumerState = (KafkaConsumerState) it.next();
                            if (kafkaConsumerState != null && kafkaConsumerState.expired(milliseconds)) {
                                KafkaConsumerManager.log.debug("Removing the expired consumer {}", kafkaConsumerState.getId());
                                it.remove();
                                KafkaConsumerManager.this.executor.submit(new Runnable() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManager.ExpirationThread.1
                                    @Override // java.lang.Runnable
                                    public void run() {
                                        kafkaConsumerState.close();
                                    }
                                });
                            }
                        }
                    }
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
            this.shutdownLatch.countDown();
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                interrupt();
                this.shutdownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted when shutting down expiration thread.");
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManager$KafkaConsumerFactory.class */
    public interface KafkaConsumerFactory {
        Consumer createConsumer(Properties properties);
    }

    /* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManager$KafkaConsumerThreadPoolExecutor.class */
    class KafkaConsumerThreadPoolExecutor extends ThreadPoolExecutor {
        private KafkaConsumerThreadPoolExecutor(int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, RejectedExecutionHandler rejectedExecutionHandler) {
            super(i, i2, j, timeUnit, blockingQueue, rejectedExecutionHandler);
        }

        @Override // java.util.concurrent.AbstractExecutorService
        protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T t) {
            return runnable instanceof RunnableReadTask ? new ReadFutureTask((RunnableReadTask) runnable, t) : super.newTaskFor(runnable, t);
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManager$ReadFutureTask.class */
    private class ReadFutureTask<V> extends FutureTask<V> {
        private final RunnableReadTask readTask;

        private ReadFutureTask(RunnableReadTask runnableReadTask, V v) {
            super(runnableReadTask, v);
            this.readTask = runnableReadTask;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManager$ReadTaskSchedulerThread.class */
    public class ReadTaskSchedulerThread extends Thread {
        final AtomicBoolean isRunning;
        final CountDownLatch shutdownLatch;

        ReadTaskSchedulerThread() {
            super("Read Task Scheduler Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.isRunning.get()) {
                try {
                    RunnableReadTask poll = KafkaConsumerManager.this.delayedReadTasks.poll(500L, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        KafkaConsumerManager.this.executor.submit(poll);
                    }
                } catch (InterruptedException e) {
                    return;
                } finally {
                    this.shutdownLatch.countDown();
                }
            }
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                interrupt();
                this.shutdownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted when shutting down read task scheduler thread.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManager$ReadTaskState.class */
    public static class ReadTaskState {
        final KafkaConsumerReadTask task;
        final KafkaConsumerState consumerState;
        final ConsumerReadCallback callback;

        public ReadTaskState(KafkaConsumerReadTask kafkaConsumerReadTask, KafkaConsumerState kafkaConsumerState, ConsumerReadCallback consumerReadCallback) {
            this.task = kafkaConsumerReadTask;
            this.consumerState = kafkaConsumerState;
            this.callback = consumerReadCallback;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/kafkarest/v2/KafkaConsumerManager$RunnableReadTask.class */
    public class RunnableReadTask implements Runnable, Delayed {
        private final ReadTaskState taskState;
        private final KafkaRestConfig consumerConfig;
        private final long started;
        private final long requestExpiration;
        private final int backoffMs;
        private long waitExpirationMs = 0;

        public RunnableReadTask(ReadTaskState readTaskState) {
            this.taskState = readTaskState;
            this.started = KafkaConsumerManager.this.config.m1getTime().milliseconds();
            this.consumerConfig = readTaskState.consumerState.getConfig();
            this.requestExpiration = this.started + this.consumerConfig.getInt(KafkaRestConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG).intValue();
            this.backoffMs = this.consumerConfig.getInt(KafkaRestConfig.CONSUMER_ITERATOR_BACKOFF_MS_CONFIG).intValue();
        }

        public void delayFor(long j) {
            if (this.requestExpiration <= KafkaConsumerManager.this.config.m1getTime().milliseconds()) {
                this.taskState.task.finish();
                KafkaConsumerManager.log.trace("Finished executing  consumer read task ({}) due to request expiry", this.taskState.task);
            } else {
                this.waitExpirationMs = Math.min(j + KafkaConsumerManager.this.config.m1getTime().milliseconds(), this.requestExpiration);
                KafkaConsumerManager.this.delayedReadTasks.add((DelayQueue<RunnableReadTask>) this);
            }
        }

        public String toString() {
            return String.format("RunnableReadTask consumer id: %s; Read task: %s; Request expiration time: %d; Wait expiration: %d", this.taskState.consumerState.getId(), this.taskState.task, Long.valueOf(this.requestExpiration), Long.valueOf(this.waitExpirationMs));
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                KafkaConsumerManager.log.trace("Executing consumer read task ({})", this.taskState.task);
                this.taskState.task.doPartialRead();
                this.taskState.consumerState.updateExpiration();
                if (this.taskState.task.isDone()) {
                    KafkaConsumerManager.log.trace("Finished executing consumer read task ({})", this.taskState.task);
                } else {
                    delayFor(this.backoffMs);
                }
            } catch (Exception e) {
                KafkaConsumerManager.log.error("Failed to read records from consumer {} while executing read task ({}). {}", new Object[]{this.taskState.consumerState.getId().toString(), this.taskState.task, e});
                this.taskState.callback.onCompletion(null, e);
            }
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this.waitExpirationMs - KafkaConsumerManager.this.config.m1getTime().milliseconds(), TimeUnit.MILLISECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            if (delayed == null) {
                throw new NullPointerException("Delayed comparator cannot compare with null");
            }
            return Long.compare(getDelay(TimeUnit.MILLISECONDS), delayed.getDelay(TimeUnit.MILLISECONDS));
        }
    }

    public KafkaConsumerManager(KafkaRestConfig kafkaRestConfig) {
        this.consumers = new HashMap();
        this.delayedReadTasks = new DelayQueue<>();
        this.adminConsumerInstanceId = null;
        this.config = kafkaRestConfig;
        this.time = kafkaRestConfig.m1getTime();
        this.bootstrapServers = RestConfigUtils.bootstrapBrokers(kafkaRestConfig);
        this.executor = new KafkaConsumerThreadPoolExecutor(0, kafkaRestConfig.getInt(KafkaRestConfig.CONSUMER_MAX_THREADS_CONFIG).intValue() < 0 ? Integer.MAX_VALUE : kafkaRestConfig.getInt(KafkaRestConfig.CONSUMER_MAX_THREADS_CONFIG).intValue(), 60L, TimeUnit.SECONDS, new SynchronousQueue(), new RejectedExecutionHandler() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManager.1
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                KafkaConsumerManager.log.debug("The runnable {} was rejected execution. The thread pool must be satured or shutiing down", runnable);
                if (runnable instanceof ReadFutureTask) {
                    ((ReadFutureTask) runnable).readTask.delayFor(ThreadLocalRandom.current().nextInt(25, 76));
                } else {
                    if (threadPoolExecutor.isShutdown()) {
                        return;
                    }
                    runnable.run();
                }
            }
        });
        this.consumerFactory = null;
        this.expirationThread = new ExpirationThread();
        this.readTaskSchedulerThread = new ReadTaskSchedulerThread();
        this.expirationThread.start();
        this.readTaskSchedulerThread.start();
    }

    KafkaConsumerManager(KafkaRestConfig kafkaRestConfig, KafkaConsumerFactory kafkaConsumerFactory) {
        this(kafkaRestConfig);
        this.consumerFactory = kafkaConsumerFactory;
    }

    /* JADX WARN: String concatenation convert failed
    jadx.core.utils.exceptions.JadxRuntimeException: Can't remove SSA var: r8v3 java.lang.String, still in use, count: 2, list:
      (r8v3 java.lang.String) from STR_CONCAT (r8v3 java.lang.String), (r0v103 java.lang.String), ("-") A[MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
      (r8v3 java.lang.String) from STR_CONCAT (r8v3 java.lang.String), (r0v103 java.lang.String), ("-") A[MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
    	at jadx.core.utils.InsnRemover.removeSsaVar(InsnRemover.java:151)
    	at jadx.core.utils.InsnRemover.unbindResult(InsnRemover.java:116)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:80)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.dex.visitors.SimplifyVisitor.removeStringBuilderInsns(SimplifyVisitor.java:495)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertStringBuilderChain(SimplifyVisitor.java:422)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertInvoke(SimplifyVisitor.java:314)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyInsn(SimplifyVisitor.java:145)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyBlock(SimplifyVisitor.java:86)
    	at jadx.core.dex.visitors.SimplifyVisitor.visit(SimplifyVisitor.java:71)
     */
    public String createConsumer(String str, ConsumerInstanceConfig consumerInstanceConfig) {
        String str2;
        String name = consumerInstanceConfig.getName();
        if (consumerInstanceConfig.getId() != null) {
            name = consumerInstanceConfig.getId();
        }
        if (name == null) {
            String string = this.config.getString(KafkaRestConfig.ID_CONFIG);
            name = new StringBuilder().append(string.isEmpty() ? "rest-consumer-" : str2 + string + "-").append(UUID.randomUUID().toString()).toString();
        }
        ConsumerInstanceId consumerInstanceId = new ConsumerInstanceId(str, name);
        synchronized (this) {
            if (this.consumers.containsKey(consumerInstanceId)) {
                throw Errors.consumerAlreadyExistsException();
            }
            this.consumers.put(consumerInstanceId, null);
        }
        try {
            log.debug("Creating consumer " + name + " in group " + str);
            Properties consumerProperties = this.config.getConsumerProperties();
            consumerProperties.setProperty(KafkaRestConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
            consumerProperties.setProperty(KafkaRestConfig.MAX_POLL_RECORDS_CONFIG, KafkaRestConfig.MAX_POLL_RECORDS_VALUE);
            consumerProperties.setProperty("group.id", str);
            if (consumerInstanceConfig.getId() != null) {
                consumerProperties.setProperty("consumer.id", consumerInstanceConfig.getId());
            }
            if (consumerInstanceConfig.getAutoCommitEnable() != null) {
                consumerProperties.setProperty("enable.auto.commit", consumerInstanceConfig.getAutoCommitEnable());
            }
            if (consumerInstanceConfig.getAutoOffsetReset() != null) {
                consumerProperties.setProperty("auto.offset.reset", consumerInstanceConfig.getAutoOffsetReset());
            }
            consumerProperties.setProperty("request.timeout.ms", "30000");
            consumerProperties.setProperty(KafkaRestConfig.SCHEMA_REGISTRY_URL_CONFIG, this.config.getString(KafkaRestConfig.SCHEMA_REGISTRY_URL_CONFIG));
            switch (consumerInstanceConfig.getFormat()) {
                case AVRO:
                    consumerProperties.put("key.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
                    consumerProperties.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
                    break;
                case JSONSCHEMA:
                    consumerProperties.put("key.deserializer", "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer");
                    consumerProperties.put("value.deserializer", "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer");
                    break;
                case PROTOBUF:
                    consumerProperties.put("key.deserializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
                    consumerProperties.put("value.deserializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer");
                    break;
                case JSON:
                case BINARY:
                default:
                    consumerProperties.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                    consumerProperties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                    break;
            }
            try {
                KafkaConsumerState createConsumerState = createConsumerState(consumerInstanceConfig, consumerInstanceId, this.consumerFactory == null ? new KafkaConsumer(consumerProperties) : this.consumerFactory.createConsumer(consumerProperties));
                synchronized (this) {
                    this.consumers.put(consumerInstanceId, createConsumerState);
                }
                String str3 = name;
                if (1 == 0) {
                    synchronized (this) {
                        this.consumers.remove(consumerInstanceId);
                    }
                }
                return str3;
            } catch (ConfigException e) {
                throw Errors.invalidConsumerConfigException(e.getMessage());
            }
        } catch (Throwable th) {
            if (0 == 0) {
                synchronized (this) {
                    this.consumers.remove(consumerInstanceId);
                }
            }
            throw th;
        }
    }

    private KafkaConsumerState createConsumerState(ConsumerInstanceConfig consumerInstanceConfig, ConsumerInstanceId consumerInstanceId, Consumer consumer) throws RestServerErrorException {
        KafkaRestConfig newConsumerConfig = KafkaRestConfig.newConsumerConfig(this.config, consumerInstanceConfig);
        switch (consumerInstanceConfig.getFormat()) {
            case AVRO:
                return new SchemaKafkaConsumerState(newConsumerConfig, consumerInstanceId, consumer, new AvroConverter());
            case JSONSCHEMA:
                return new SchemaKafkaConsumerState(newConsumerConfig, consumerInstanceId, consumer, new JsonSchemaConverter());
            case PROTOBUF:
                return new SchemaKafkaConsumerState(newConsumerConfig, consumerInstanceId, consumer, new ProtobufConverter());
            case JSON:
                return new JsonKafkaConsumerState(newConsumerConfig, consumerInstanceId, consumer);
            case BINARY:
                return new BinaryKafkaConsumerState(newConsumerConfig, consumerInstanceId, consumer);
            default:
                throw new RestServerErrorException(String.format("Invalid embedded format %s for new consumer.", consumerInstanceConfig.getFormat()), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
        }
    }

    public <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> void readRecords(String str, String str2, Class<? extends KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> cls, long j, long j2, ConsumerReadCallback<ClientKeyT, ClientValueT> consumerReadCallback) {
        try {
            KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
            if (!cls.isInstance(consumerInstance)) {
                consumerReadCallback.onCompletion(null, Errors.consumerFormatMismatch());
            } else {
                this.executor.submit(new RunnableReadTask(new ReadTaskState(new KafkaConsumerReadTask(consumerInstance, j, j2, consumerReadCallback), consumerInstance, consumerReadCallback)));
            }
        } catch (RestNotFoundException e) {
            consumerReadCallback.onCompletion(null, e);
        }
    }

    public Future commitOffsets(String str, String str2, final String str3, final ConsumerOffsetCommitRequest consumerOffsetCommitRequest, final CommitCallback commitCallback) {
        try {
            final KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
            return this.executor.submit(new Runnable() { // from class: io.confluent.kafkarest.v2.KafkaConsumerManager.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        commitCallback.onCompletion(consumerInstance.commitOffsets(str3, consumerOffsetCommitRequest), null);
                    } catch (Exception e) {
                        KafkaConsumerManager.log.error("Failed to commit offsets for consumer " + consumerInstance.getId().toString(), e);
                        commitCallback.onCompletion(null, e);
                    } finally {
                        consumerInstance.updateExpiration();
                    }
                }

                public String toString() {
                    return String.format("OffsetCommit consumer id: %s; Async: %s;", consumerInstance.getId(), str3);
                }
            });
        } catch (RestNotFoundException e) {
            commitCallback.onCompletion(null, e);
            return null;
        }
    }

    public ConsumerCommittedResponse committed(String str, String str2, ConsumerCommittedRequest consumerCommittedRequest) {
        log.debug("Committed offsets for consumer " + str2 + " in group " + str);
        KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
        return consumerInstance != null ? consumerInstance.committed(consumerCommittedRequest) : new ConsumerCommittedResponse(new ArrayList());
    }

    public long getBeginningOffset(String str, int i) {
        log.debug("Beginning offset for topic {} and partition {}.", str, Integer.valueOf(i));
        return getAdminConsumerInstance().getBeginningOffset(str, i);
    }

    public long getEndOffset(String str, int i) {
        log.debug("End offset for topic {} and partition {}.", str, Integer.valueOf(i));
        return getAdminConsumerInstance().getEndOffset(str, i);
    }

    public Optional<Long> getOffsetForTime(String str, int i, Instant instant) {
        log.debug("Offset for topic {} and partition {} at timestamp {}.", new Object[]{str, Integer.valueOf(i), instant});
        return getAdminConsumerInstance().getOffsetForTime(str, i, instant);
    }

    private String createAdminConsumerGroup() {
        String string = this.config.getString(KafkaRestConfig.ID_CONFIG);
        return string.isEmpty() ? String.format("rest-consumer-group-%s", UUID.randomUUID().toString()) : String.format("rest-consumer-group-%s-%s", string, UUID.randomUUID().toString());
    }

    private synchronized KafkaConsumerState<?, ?, ?, ?> getAdminConsumerInstance() {
        if (this.adminConsumerInstanceId == null || !this.consumers.containsKey(this.adminConsumerInstanceId)) {
            this.adminConsumerInstanceId = createAdminConsumerInstance();
        }
        return getConsumerInstance(this.adminConsumerInstanceId);
    }

    private ConsumerInstanceId createAdminConsumerInstance() {
        String createAdminConsumerGroup = createAdminConsumerGroup();
        return new ConsumerInstanceId(createAdminConsumerGroup, createConsumer(createAdminConsumerGroup, ConsumerInstanceConfig.create(EmbeddedFormat.BINARY)));
    }

    public void deleteConsumer(String str, String str2) {
        log.debug("Destroying consumer " + str2 + " in group " + str);
        getConsumerInstance(str, str2, true).close();
    }

    public void subscribe(String str, String str2, ConsumerSubscriptionRecord consumerSubscriptionRecord) {
        log.debug("Subscribing consumer " + str2 + " in group " + str);
        KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
        if (consumerInstance != null) {
            consumerInstance.subscribe(consumerSubscriptionRecord);
        }
    }

    public void unsubscribe(String str, String str2) {
        log.debug("Unsubcribing consumer " + str2 + " in group " + str);
        KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
        if (consumerInstance != null) {
            consumerInstance.unsubscribe();
        }
    }

    public ConsumerSubscriptionResponse subscription(String str, String str2) {
        KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
        return consumerInstance != null ? new ConsumerSubscriptionResponse(new ArrayList(consumerInstance.subscription())) : new ConsumerSubscriptionResponse(new ArrayList());
    }

    public void seekToBeginning(String str, String str2, ConsumerSeekToRequest consumerSeekToRequest) {
        log.debug("seeking to beginning " + str2 + " in group " + str);
        KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
        if (consumerInstance != null) {
            consumerInstance.seekToBeginning(consumerSeekToRequest);
        }
    }

    public void seekToEnd(String str, String str2, ConsumerSeekToRequest consumerSeekToRequest) {
        log.debug("seeking to end " + str2 + " in group " + str);
        KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
        if (consumerInstance != null) {
            consumerInstance.seekToEnd(consumerSeekToRequest);
        }
    }

    public void seekToOffset(String str, String str2, ConsumerSeekToOffsetRequest consumerSeekToOffsetRequest) {
        log.debug("seeking to offset " + str2 + " in group " + str);
        KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
        if (consumerInstance != null) {
            consumerInstance.seekToOffset(consumerSeekToOffsetRequest);
        }
    }

    public void assign(String str, String str2, ConsumerAssignmentRequest consumerAssignmentRequest) {
        log.debug("seeking to end " + str2 + " in group " + str);
        KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
        if (consumerInstance != null) {
            consumerInstance.assign(consumerAssignmentRequest);
        }
    }

    public ConsumerAssignmentResponse assignment(String str, String str2) {
        log.debug("getting assignment for  " + str2 + " in group " + str);
        Vector vector = new Vector();
        KafkaConsumerState<?, ?, ?, ?> consumerInstance = getConsumerInstance(str, str2);
        if (consumerInstance != null) {
            for (TopicPartition topicPartition : consumerInstance.assignment()) {
                vector.add(new io.confluent.kafkarest.entities.v2.TopicPartition(topicPartition.topic(), Integer.valueOf(topicPartition.partition())));
            }
        }
        return new ConsumerAssignmentResponse(vector);
    }

    public void shutdown() {
        log.debug("Shutting down consumers");
        this.executor.shutdown();
        log.trace("Shutting down consumer expiration thread");
        this.expirationThread.shutdown();
        this.readTaskSchedulerThread.shutdown();
        synchronized (this) {
            Iterator<Map.Entry<ConsumerInstanceId, KafkaConsumerState>> it = this.consumers.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().close();
            }
            this.consumers.clear();
            this.executor.shutdown();
        }
    }

    private synchronized KafkaConsumerState<?, ?, ?, ?> getConsumerInstance(String str, String str2, boolean z) {
        ConsumerInstanceId consumerInstanceId = new ConsumerInstanceId(str, str2);
        KafkaConsumerState<?, ?, ?, ?> remove = z ? this.consumers.remove(consumerInstanceId) : this.consumers.get(consumerInstanceId);
        if (remove == null) {
            throw Errors.consumerInstanceNotFoundException();
        }
        remove.updateExpiration();
        return remove;
    }

    KafkaConsumerState<?, ?, ?, ?> getConsumerInstance(String str, String str2) {
        return getConsumerInstance(str, str2, false);
    }

    private KafkaConsumerState<?, ?, ?, ?> getConsumerInstance(ConsumerInstanceId consumerInstanceId) {
        return getConsumerInstance(consumerInstanceId.getGroup(), consumerInstanceId.getInstance());
    }
}
