package io.confluent.kafkarest;

import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.rest.RestConfigException;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestNotFoundException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.ws.rs.core.Response;
import kafka.common.InvalidConfigException;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:57)
    */
/* loaded from: input_file:io/confluent/kafkarest/ConsumerManager.class */
public class ConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final KafkaRestConfig config;
    private final Time time;
    private final String zookeeperConnect;
    private final MetadataObserver mdObserver;
    private final int iteratorTimeoutMs;
    private final Map<ConsumerInstanceId, ConsumerState> consumers;
    private final ExecutorService executor;
    private ConsumerFactory consumerFactory;
    final DelayQueue<RunnableReadTask> delayedReadTasks;
    private final ReadTaskSchedulerThread readTaskSchedulerThread;
    private final ExpirationThread expirationThread;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.kafkarest.ConsumerManager$1 */
    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$1.class */
    public class AnonymousClass1 implements RejectedExecutionHandler {
        final /* synthetic */ KafkaRestConfig val$config;

        AnonymousClass1(KafkaRestConfig kafkaRestConfig) {
            r5 = kafkaRestConfig;
        }

        @Override // java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            if (!(runnable instanceof RunnableReadTask)) {
                if (threadPoolExecutor.isShutdown()) {
                    return;
                }
                runnable.run();
            } else {
                RunnableReadTask runnableReadTask = (RunnableReadTask) runnable;
                RunnableReadTask.access$002(runnableReadTask, r5.m6getTime().milliseconds() + ThreadLocalRandom.current().nextInt(25, 76));
                ConsumerManager.this.delayedReadTasks.add((DelayQueue<RunnableReadTask>) runnableReadTask);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.confluent.kafkarest.ConsumerManager$2 */
    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$2.class */
    public class AnonymousClass2 implements Runnable {
        final /* synthetic */ ConsumerState val$state;
        final /* synthetic */ CommitCallback val$callback;

        AnonymousClass2(ConsumerState consumerState, CommitCallback commitCallback) {
            r5 = consumerState;
            r6 = commitCallback;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    r6.onCompletion(r5.commitOffsets(), null);
                    r5.updateExpiration();
                } catch (Exception e) {
                    ConsumerManager.log.error("Failed to commit offsets for consumer " + r5.getId().toString(), e);
                    RestServerErrorException restServerErrorException = e;
                    if (!(e instanceof RestException)) {
                        restServerErrorException = Errors.kafkaErrorException(e);
                    }
                    r6.onCompletion(null, restServerErrorException);
                    r5.updateExpiration();
                }
            } catch (Throwable th) {
                r5.updateExpiration();
                throw th;
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$CommitCallback.class */
    public interface CommitCallback {
        void onCompletion(List<TopicPartitionOffset> list, Exception exc);
    }

    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$ConsumerFactory.class */
    public interface ConsumerFactory {
        ConsumerConnector createConsumer(ConsumerConfig consumerConfig);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$ExpirationThread.class */
    public class ExpirationThread extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        /* renamed from: io.confluent.kafkarest.ConsumerManager$ExpirationThread$1 */
        /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$ExpirationThread$1.class */
        class AnonymousClass1 implements Runnable {
            final /* synthetic */ ConsumerState val$state;

            AnonymousClass1(ConsumerState consumerState) {
                r5 = consumerState;
            }

            @Override // java.lang.Runnable
            public void run() {
                r5.close();
            }
        }

        public ExpirationThread() {
            super("Consumer Expiration Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.isRunning.get()) {
                try {
                    synchronized (ConsumerManager.this) {
                        long milliseconds = ConsumerManager.this.time.milliseconds();
                        Iterator it = ConsumerManager.this.consumers.values().iterator();
                        while (it.hasNext()) {
                            ConsumerState consumerState = (ConsumerState) it.next();
                            if (consumerState != null && consumerState.expired(milliseconds)) {
                                ConsumerManager.log.debug("Removing the expired consumer {}", consumerState.getId());
                                it.remove();
                                ConsumerManager.this.executor.submit(new Runnable() { // from class: io.confluent.kafkarest.ConsumerManager.ExpirationThread.1
                                    final /* synthetic */ ConsumerState val$state;

                                    AnonymousClass1(ConsumerState consumerState2) {
                                        r5 = consumerState2;
                                    }

                                    @Override // java.lang.Runnable
                                    public void run() {
                                        r5.close();
                                    }
                                });
                            }
                        }
                    }
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
            this.shutdownLatch.countDown();
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                interrupt();
                this.shutdownLatch.await();
            } catch (InterruptedException e) {
                throw new Error("Interrupted when shutting down consumer worker thread.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$ReadTaskSchedulerThread.class */
    public class ReadTaskSchedulerThread extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        ReadTaskSchedulerThread() {
            super("Read Task Scheduler Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.isRunning.get()) {
                try {
                    RunnableReadTask poll = ConsumerManager.this.delayedReadTasks.poll(500L, TimeUnit.MILLISECONDS);
                    if (poll != null) {
                        ConsumerManager.this.executor.submit(poll);
                    }
                } catch (InterruptedException e) {
                    this.shutdownLatch.countDown();
                    return;
                } catch (Throwable th) {
                    this.shutdownLatch.countDown();
                    throw th;
                }
            }
            this.shutdownLatch.countDown();
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                interrupt();
                this.shutdownLatch.await();
            } catch (InterruptedException e) {
                throw new RuntimeException("Interrupted when shutting down read task scheduler thread.");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$ReadTaskState.class */
    public static class ReadTaskState {
        final ConsumerReadTask task;
        final ConsumerState consumerState;
        final ConsumerReadCallback callback;

        public ReadTaskState(ConsumerReadTask consumerReadTask, ConsumerState consumerState, ConsumerReadCallback consumerReadCallback) {
            this.task = consumerReadTask;
            this.consumerState = consumerState;
            this.callback = consumerReadCallback;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$RunnableReadTask.class */
    public class RunnableReadTask implements Runnable, Delayed {
        private final ReadTaskState taskState;
        private final KafkaRestConfig consumerConfig;
        private final long started;
        private final long requestExpiration;
        private long waitExpirationMs = 0;

        public RunnableReadTask(ReadTaskState readTaskState) {
            this.taskState = readTaskState;
            this.started = ConsumerManager.this.config.m6getTime().milliseconds();
            this.consumerConfig = readTaskState.consumerState.getConfig();
            this.requestExpiration = this.started + this.consumerConfig.getInt(KafkaRestConfig.CONSUMER_REQUEST_TIMEOUT_MS_CONFIG);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                ConsumerManager.log.trace("Executing consumer read task ({})", this.taskState.task);
                if (this.taskState.task.isDone()) {
                    return;
                }
                this.taskState.task.doPartialRead();
                this.taskState.consumerState.updateExpiration();
                if (this.taskState.task.isDone()) {
                    ConsumerManager.log.trace("Finished executing consumer read task ({})", this.taskState.task);
                } else {
                    this.waitExpirationMs = Math.min(ConsumerManager.this.config.m6getTime().milliseconds() + this.consumerConfig.getInt(KafkaRestConfig.CONSUMER_ITERATOR_BACKOFF_MS_CONFIG), this.requestExpiration);
                    ConsumerManager.this.delayedReadTasks.add((DelayQueue<RunnableReadTask>) this);
                }
            } catch (Exception e) {
                ConsumerManager.log.error("Failed to read records consumer " + this.taskState.consumerState.getId().toString(), e);
                RestServerErrorException restServerErrorException = e;
                if (!(e instanceof RestException)) {
                    restServerErrorException = Errors.kafkaErrorException(e);
                }
                this.taskState.callback.onCompletion(null, (RestException) restServerErrorException);
            }
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this.waitExpirationMs - ConsumerManager.this.config.m6getTime().milliseconds(), TimeUnit.MILLISECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            if (delayed == null) {
                throw new NullPointerException("Delayed comparator cannot compare with null");
            }
            return Long.compare(getDelay(TimeUnit.MILLISECONDS), delayed.getDelay(TimeUnit.MILLISECONDS));
        }

        /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: io.confluent.kafkarest.ConsumerManager.RunnableReadTask.access$002(io.confluent.kafkarest.ConsumerManager$RunnableReadTask, long):long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        static /* synthetic */ long access$002(io.confluent.kafkarest.ConsumerManager.RunnableReadTask r6, long r7) {
            /*
                r0 = r6
                r1 = r7
                // decode failed: arraycopy: source index -1 out of bounds for object array[6]
                r0.waitExpirationMs = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: io.confluent.kafkarest.ConsumerManager.RunnableReadTask.access$002(io.confluent.kafkarest.ConsumerManager$RunnableReadTask, long):long");
        }
    }

    public ConsumerManager(KafkaRestConfig kafkaRestConfig, MetadataObserver metadataObserver) {
        this.consumers = new HashMap();
        this.delayedReadTasks = new DelayQueue<>();
        this.config = kafkaRestConfig;
        this.time = kafkaRestConfig.m6getTime();
        this.zookeeperConnect = kafkaRestConfig.getString(KafkaRestConfig.ZOOKEEPER_CONNECT_CONFIG);
        this.mdObserver = metadataObserver;
        this.iteratorTimeoutMs = kafkaRestConfig.getInt(KafkaRestConfig.CONSUMER_ITERATOR_TIMEOUT_MS_CONFIG);
        this.executor = new ThreadPoolExecutor(0, kafkaRestConfig.getInt(KafkaRestConfig.CONSUMER_MAX_THREADS_CONFIG) < 0 ? Integer.MAX_VALUE : kafkaRestConfig.getInt(KafkaRestConfig.CONSUMER_MAX_THREADS_CONFIG), 60L, TimeUnit.SECONDS, new SynchronousQueue(), new RejectedExecutionHandler() { // from class: io.confluent.kafkarest.ConsumerManager.1
            final /* synthetic */ KafkaRestConfig val$config;

            AnonymousClass1(KafkaRestConfig kafkaRestConfig2) {
                r5 = kafkaRestConfig2;
            }

            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                if (!(runnable instanceof RunnableReadTask)) {
                    if (threadPoolExecutor.isShutdown()) {
                        return;
                    }
                    runnable.run();
                } else {
                    RunnableReadTask runnableReadTask = (RunnableReadTask) runnable;
                    RunnableReadTask.access$002(runnableReadTask, r5.m6getTime().milliseconds() + ThreadLocalRandom.current().nextInt(25, 76));
                    ConsumerManager.this.delayedReadTasks.add((DelayQueue<RunnableReadTask>) runnableReadTask);
                }
            }
        });
        this.consumerFactory = null;
        this.expirationThread = new ExpirationThread();
        this.expirationThread.start();
        this.readTaskSchedulerThread = new ReadTaskSchedulerThread();
        this.readTaskSchedulerThread.start();
    }

    public ConsumerManager(KafkaRestConfig kafkaRestConfig, MetadataObserver metadataObserver, ConsumerFactory consumerFactory) {
        this(kafkaRestConfig, metadataObserver);
        this.consumerFactory = consumerFactory;
    }

    /* JADX WARN: String concatenation convert failed
    jadx.core.utils.exceptions.JadxRuntimeException: Can't remove SSA var: r8v3 java.lang.String, still in use, count: 2, list:
      (r8v3 java.lang.String) from STR_CONCAT (r8v3 java.lang.String), (r0v85 java.lang.String), ("-") A[MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
      (r8v3 java.lang.String) from STR_CONCAT (r8v3 java.lang.String), (r0v85 java.lang.String), ("-") A[MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
    	at jadx.core.utils.InsnRemover.removeSsaVar(InsnRemover.java:151)
    	at jadx.core.utils.InsnRemover.unbindResult(InsnRemover.java:116)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:80)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.dex.visitors.SimplifyVisitor.removeStringBuilderInsns(SimplifyVisitor.java:495)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertStringBuilderChain(SimplifyVisitor.java:422)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertInvoke(SimplifyVisitor.java:314)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyInsn(SimplifyVisitor.java:145)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyBlock(SimplifyVisitor.java:86)
    	at jadx.core.dex.visitors.SimplifyVisitor.visit(SimplifyVisitor.java:71)
     */
    public String createConsumer(String str, ConsumerInstanceConfig consumerInstanceConfig) {
        String str2;
        String name = consumerInstanceConfig.getName();
        if (consumerInstanceConfig.getId() != null) {
            name = consumerInstanceConfig.getId();
        }
        if (name == null) {
            String string = this.config.getString(KafkaRestConfig.ID_CONFIG);
            name = new StringBuilder().append(string.isEmpty() ? "rest-consumer-" : str2 + string + "-").append(UUID.randomUUID().toString()).toString();
        }
        ConsumerInstanceId consumerInstanceId = new ConsumerInstanceId(str, name);
        synchronized (this) {
            if (this.consumers.containsKey(consumerInstanceId)) {
                throw Errors.consumerAlreadyExistsException();
            }
            this.consumers.put(consumerInstanceId, null);
        }
        try {
            log.debug("Creating consumer " + name + " in group " + str);
            Properties properties = (Properties) this.config.getOriginalProperties().clone();
            properties.setProperty(KafkaRestConfig.ZOOKEEPER_CONNECT_CONFIG, this.zookeeperConnect);
            properties.setProperty("group.id", str);
            if (consumerInstanceConfig.getId() != null) {
                properties.setProperty("consumer.id", consumerInstanceConfig.getId());
            }
            properties.setProperty("consumer.timeout.ms", Integer.toString(this.iteratorTimeoutMs));
            if (consumerInstanceConfig.getAutoCommitEnable() != null) {
                properties.setProperty("auto.commit.enable", consumerInstanceConfig.getAutoCommitEnable());
            } else {
                properties.setProperty("auto.commit.enable", "false");
            }
            if (consumerInstanceConfig.getAutoOffsetReset() != null) {
                properties.setProperty("auto.offset.reset", consumerInstanceConfig.getAutoOffsetReset());
            }
            try {
                ConsumerState createConsumerState = createConsumerState(consumerInstanceConfig, consumerInstanceId, this.consumerFactory == null ? Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)) : this.consumerFactory.createConsumer(new ConsumerConfig(properties)));
                synchronized (this) {
                    this.consumers.put(consumerInstanceId, createConsumerState);
                }
                String str3 = name;
                if (1 == 0) {
                    synchronized (this) {
                        this.consumers.remove(consumerInstanceId);
                    }
                }
                return str3;
            } catch (InvalidConfigException e) {
                throw Errors.invalidConsumerConfigException(e);
            }
        } catch (Throwable th) {
            if (0 == 0) {
                synchronized (this) {
                    this.consumers.remove(consumerInstanceId);
                }
            }
            throw th;
        }
    }

    private ConsumerState createConsumerState(ConsumerInstanceConfig consumerInstanceConfig, ConsumerInstanceId consumerInstanceId, ConsumerConnector consumerConnector) throws RestServerErrorException {
        KafkaRestConfig newConsumerConfig = newConsumerConfig(this.config, consumerInstanceConfig);
        switch (consumerInstanceConfig.getFormat()) {
            case BINARY:
                return new BinaryConsumerState(newConsumerConfig, consumerInstanceId, consumerConnector);
            case AVRO:
                return new AvroConsumerState(newConsumerConfig, consumerInstanceId, consumerConnector);
            case JSON:
                return new JsonConsumerState(newConsumerConfig, consumerInstanceId, consumerConnector);
            default:
                throw new RestServerErrorException(String.format("Invalid embedded format %s for new consumer.", consumerInstanceConfig.getFormat()), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
        }
    }

    public static KafkaRestConfig newConsumerConfig(KafkaRestConfig kafkaRestConfig, ConsumerInstanceConfig consumerInstanceConfig) throws RestServerErrorException {
        Properties attachProxySpecificProperties = ConsumerInstanceConfig.attachProxySpecificProperties((Properties) kafkaRestConfig.getOriginalProperties().clone(), consumerInstanceConfig);
        try {
            return new KafkaRestConfig(attachProxySpecificProperties, kafkaRestConfig.m6getTime());
        } catch (RestConfigException e) {
            throw new RestServerErrorException(String.format("Invalid configuration for new consumer: %s", attachProxySpecificProperties), Response.Status.BAD_REQUEST.getStatusCode(), e);
        }
    }

    public <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> Future<List<ConsumerRecord<ClientKeyT, ClientValueT>>> readTopic(String str, String str2, String str3, Class<? extends ConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> cls, long j, ConsumerReadCallback<ClientKeyT, ClientValueT> consumerReadCallback) {
        try {
            ConsumerState consumerInstance = getConsumerInstance(str, str2);
            if (!cls.isInstance(consumerInstance)) {
                consumerReadCallback.onCompletion(null, Errors.consumerFormatMismatch());
                return null;
            }
            if (!this.mdObserver.topicExists(str3)) {
                consumerReadCallback.onCompletion(null, Errors.topicNotFoundException());
                return null;
            }
            ConsumerReadTask consumerReadTask = new ConsumerReadTask(consumerInstance, str3, j, consumerReadCallback);
            this.executor.submit(new RunnableReadTask(new ReadTaskState(consumerReadTask, consumerInstance, consumerReadCallback)));
            return consumerReadTask;
        } catch (RestNotFoundException e) {
            consumerReadCallback.onCompletion(null, e);
            return null;
        }
    }

    public Future commitOffsets(String str, String str2, CommitCallback commitCallback) {
        try {
            return this.executor.submit(new Runnable() { // from class: io.confluent.kafkarest.ConsumerManager.2
                final /* synthetic */ ConsumerState val$state;
                final /* synthetic */ CommitCallback val$callback;

                AnonymousClass2(ConsumerState consumerState, CommitCallback commitCallback2) {
                    r5 = consumerState;
                    r6 = commitCallback2;
                }

                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            r6.onCompletion(r5.commitOffsets(), null);
                            r5.updateExpiration();
                        } catch (Exception e) {
                            ConsumerManager.log.error("Failed to commit offsets for consumer " + r5.getId().toString(), e);
                            RestServerErrorException restServerErrorException = e;
                            if (!(e instanceof RestException)) {
                                restServerErrorException = Errors.kafkaErrorException(e);
                            }
                            r6.onCompletion(null, restServerErrorException);
                            r5.updateExpiration();
                        }
                    } catch (Throwable th) {
                        r5.updateExpiration();
                        throw th;
                    }
                }
            });
        } catch (RestNotFoundException e) {
            commitCallback2.onCompletion(null, e);
            return null;
        }
    }

    public void deleteConsumer(String str, String str2) {
        log.debug("Destroying consumer " + str2 + " in group " + str);
        getConsumerInstance(str, str2, true).close();
    }

    public void shutdown() {
        log.trace("Shutting down consumer expiration thread");
        this.expirationThread.shutdown();
        log.trace("Shutting down read task scheduler thread");
        this.readTaskSchedulerThread.shutdown();
        synchronized (this) {
            Iterator<Map.Entry<ConsumerInstanceId, ConsumerState>> it = this.consumers.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().close();
            }
            this.consumers.clear();
            this.executor.shutdown();
        }
    }

    private synchronized ConsumerState getConsumerInstance(String str, String str2, boolean z) {
        ConsumerInstanceId consumerInstanceId = new ConsumerInstanceId(str, str2);
        ConsumerState remove = z ? this.consumers.remove(consumerInstanceId) : this.consumers.get(consumerInstanceId);
        if (remove == null) {
            throw Errors.consumerInstanceNotFoundException();
        }
        remove.updateExpiration();
        return remove;
    }

    ConsumerState getConsumerInstance(String str, String str2) {
        return getConsumerInstance(str, str2, false);
    }

    static {
    }
}
