package io.confluent.kafkarest;

import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.rest.exceptions.RestException;
import io.confluent.rest.exceptions.RestNotFoundException;
import io.confluent.rest.exceptions.RestServerErrorException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Properties;
import java.util.UUID;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.core.Response;
import kafka.common.InvalidConfigException;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafkarest/ConsumerManager.class */
public class ConsumerManager {
    private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
    private final KafkaRestConfig config;
    private final Time time;
    private final String zookeeperConnect;
    private final MetadataObserver mdObserver;
    private final int iteratorTimeoutMs;
    private final Map<ConsumerInstanceId, ConsumerState> consumers;
    private final List<ConsumerWorker> workers;
    private final AtomicInteger nextWorker;
    private final ExecutorService executor;
    private ConsumerFactory consumerFactory;
    private final PriorityQueue<ConsumerState> consumersByExpiration;
    private final ExpirationThread expirationThread;

    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$CommitCallback.class */
    public interface CommitCallback {
        void onCompletion(List<TopicPartitionOffset> list, Exception exc);
    }

    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$ConsumerFactory.class */
    public interface ConsumerFactory {
        ConsumerConnector createConsumer(ConsumerConfig consumerConfig);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$ExpirationThread.class */
    public class ExpirationThread extends Thread {
        AtomicBoolean isRunning;
        CountDownLatch shutdownLatch;

        public ExpirationThread() {
            super("Consumer Expiration Thread");
            this.isRunning = new AtomicBoolean(true);
            this.shutdownLatch = new CountDownLatch(1);
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            synchronized (ConsumerManager.this) {
                while (this.isRunning.get()) {
                    try {
                        long milliseconds = ConsumerManager.this.time.milliseconds();
                        while (!ConsumerManager.this.consumersByExpiration.isEmpty() && ((ConsumerState) ConsumerManager.this.consumersByExpiration.peek()).expired(milliseconds)) {
                            final ConsumerState consumerState = (ConsumerState) ConsumerManager.this.consumersByExpiration.remove();
                            ConsumerManager.this.consumers.remove(consumerState.getId());
                            ConsumerManager.this.executor.submit(new Runnable() { // from class: io.confluent.kafkarest.ConsumerManager.ExpirationThread.1
                                @Override // java.lang.Runnable
                                public void run() {
                                    consumerState.close();
                                }
                            });
                        }
                        ConsumerManager.this.wait(ConsumerManager.this.consumersByExpiration.isEmpty() ? Long.MAX_VALUE : ((ConsumerState) ConsumerManager.this.consumersByExpiration.peek()).untilExpiration(milliseconds));
                    } catch (InterruptedException e) {
                    }
                }
            }
            this.shutdownLatch.countDown();
        }

        public void shutdown() {
            try {
                this.isRunning.set(false);
                interrupt();
                this.shutdownLatch.await();
            } catch (InterruptedException e) {
                throw new Error("Interrupted when shutting down consumer worker thread.");
            }
        }
    }

    /* loaded from: input_file:io/confluent/kafkarest/ConsumerManager$ReadCallback.class */
    public interface ReadCallback<K, V> {
        void onCompletion(List<? extends ConsumerRecord<K, V>> list, Exception exc);
    }

    public ConsumerManager(KafkaRestConfig kafkaRestConfig, MetadataObserver metadataObserver) {
        this.consumers = new HashMap();
        this.consumersByExpiration = new PriorityQueue<>();
        this.config = kafkaRestConfig;
        this.time = kafkaRestConfig.m7getTime();
        this.zookeeperConnect = kafkaRestConfig.getString(KafkaRestConfig.ZOOKEEPER_CONNECT_CONFIG);
        this.mdObserver = metadataObserver;
        this.iteratorTimeoutMs = kafkaRestConfig.getInt(KafkaRestConfig.CONSUMER_ITERATOR_TIMEOUT_MS_CONFIG);
        this.workers = new Vector();
        for (int i = 0; i < kafkaRestConfig.getInt(KafkaRestConfig.CONSUMER_THREADS_CONFIG); i++) {
            ConsumerWorker consumerWorker = new ConsumerWorker(kafkaRestConfig);
            this.workers.add(consumerWorker);
            consumerWorker.start();
        }
        this.nextWorker = new AtomicInteger(0);
        this.executor = Executors.newFixedThreadPool(1);
        this.consumerFactory = null;
        this.expirationThread = new ExpirationThread();
        this.expirationThread.start();
    }

    public ConsumerManager(KafkaRestConfig kafkaRestConfig, MetadataObserver metadataObserver, ConsumerFactory consumerFactory) {
        this(kafkaRestConfig, metadataObserver);
        this.consumerFactory = consumerFactory;
    }

    /* JADX WARN: String concatenation convert failed
    jadx.core.utils.exceptions.JadxRuntimeException: Can't remove SSA var: r9v3 java.lang.String, still in use, count: 2, list:
      (r9v3 java.lang.String) from STR_CONCAT (r9v3 java.lang.String), (r0v93 java.lang.String), ("-") A[MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
      (r9v3 java.lang.String) from STR_CONCAT (r9v3 java.lang.String), (r0v93 java.lang.String), ("-") A[MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
    	at jadx.core.utils.InsnRemover.removeSsaVar(InsnRemover.java:151)
    	at jadx.core.utils.InsnRemover.unbindResult(InsnRemover.java:116)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:80)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
    	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
    	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
    	at jadx.core.dex.visitors.SimplifyVisitor.removeStringBuilderInsns(SimplifyVisitor.java:495)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertStringBuilderChain(SimplifyVisitor.java:422)
    	at jadx.core.dex.visitors.SimplifyVisitor.convertInvoke(SimplifyVisitor.java:314)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyInsn(SimplifyVisitor.java:145)
    	at jadx.core.dex.visitors.SimplifyVisitor.simplifyBlock(SimplifyVisitor.java:86)
    	at jadx.core.dex.visitors.SimplifyVisitor.visit(SimplifyVisitor.java:71)
     */
    public String createConsumer(String str, ConsumerInstanceConfig consumerInstanceConfig) {
        ConsumerState jsonConsumerState;
        String str2;
        String name = consumerInstanceConfig.getName();
        if (consumerInstanceConfig.getId() != null) {
            name = consumerInstanceConfig.getId();
        }
        if (name == null) {
            String string = this.config.getString(KafkaRestConfig.ID_CONFIG);
            name = new StringBuilder().append(string.isEmpty() ? "rest-consumer-" : str2 + string + "-").append(UUID.randomUUID().toString()).toString();
        }
        ConsumerInstanceId consumerInstanceId = new ConsumerInstanceId(str, name);
        synchronized (this) {
            if (this.consumers.containsKey(consumerInstanceId)) {
                throw Errors.consumerAlreadyExistsException();
            }
            this.consumers.put(consumerInstanceId, null);
        }
        try {
            log.debug("Creating consumer " + name + " in group " + str);
            Properties properties = (Properties) this.config.getOriginalProperties().clone();
            properties.setProperty(KafkaRestConfig.ZOOKEEPER_CONNECT_CONFIG, this.zookeeperConnect);
            properties.setProperty("group.id", str);
            if (consumerInstanceConfig.getId() != null) {
                properties.setProperty("consumer.id", consumerInstanceConfig.getId());
            }
            properties.setProperty("consumer.timeout.ms", Integer.valueOf(this.iteratorTimeoutMs).toString());
            if (consumerInstanceConfig.getAutoCommitEnable() != null) {
                properties.setProperty("auto.commit.enable", consumerInstanceConfig.getAutoCommitEnable());
            } else {
                properties.setProperty("auto.commit.enable", "false");
            }
            if (consumerInstanceConfig.getAutoOffsetReset() != null) {
                properties.setProperty("auto.offset.reset", consumerInstanceConfig.getAutoOffsetReset());
            }
            try {
                ConsumerConnector createJavaConsumerConnector = this.consumerFactory == null ? Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)) : this.consumerFactory.createConsumer(new ConsumerConfig(properties));
                switch (consumerInstanceConfig.getFormat()) {
                    case BINARY:
                        jsonConsumerState = new BinaryConsumerState(this.config, consumerInstanceId, createJavaConsumerConnector);
                        break;
                    case AVRO:
                        jsonConsumerState = new AvroConsumerState(this.config, consumerInstanceId, createJavaConsumerConnector);
                        break;
                    case JSON:
                        jsonConsumerState = new JsonConsumerState(this.config, consumerInstanceId, createJavaConsumerConnector);
                        break;
                    default:
                        throw new RestServerErrorException("Invalid embedded format for new consumer.", Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
                }
                synchronized (this) {
                    this.consumers.put(consumerInstanceId, jsonConsumerState);
                    this.consumersByExpiration.add(jsonConsumerState);
                    notifyAll();
                }
                String str3 = name;
                if (1 == 0) {
                    synchronized (this) {
                        this.consumers.remove(consumerInstanceId);
                    }
                }
                return str3;
            } catch (InvalidConfigException e) {
                throw Errors.invalidConsumerConfigException(e);
            }
        } catch (Throwable th) {
            if (0 == 0) {
                synchronized (this) {
                    this.consumers.remove(consumerInstanceId);
                }
            }
            throw th;
        }
    }

    public <KafkaK, KafkaV, ClientK, ClientV> Future readTopic(String str, String str2, String str3, Class<? extends ConsumerState<KafkaK, KafkaV, ClientK, ClientV>> cls, long j, final ReadCallback readCallback) {
        try {
            final ConsumerState consumerInstance = getConsumerInstance(str, str2);
            if (!cls.isInstance(consumerInstance)) {
                readCallback.onCompletion(null, Errors.consumerFormatMismatch());
                return null;
            }
            if (this.mdObserver.topicExists(str3)) {
                return this.workers.get(this.nextWorker.getAndIncrement() % this.workers.size()).readTopic(consumerInstance, str3, j, new ConsumerWorkerReadCallback<ClientK, ClientV>() { // from class: io.confluent.kafkarest.ConsumerManager.1
                    @Override // io.confluent.kafkarest.ConsumerWorkerReadCallback
                    public void onCompletion(List<? extends ConsumerRecord<ClientK, ClientV>> list, Exception exc) {
                        ConsumerManager.this.updateExpiration(consumerInstance);
                        if (exc == null) {
                            readCallback.onCompletion(list, null);
                            return;
                        }
                        Exception exc2 = exc;
                        if (!(exc instanceof RestException)) {
                            exc2 = Errors.kafkaErrorException(exc);
                        }
                        readCallback.onCompletion(null, exc2);
                    }
                });
            }
            readCallback.onCompletion(null, Errors.topicNotFoundException());
            return null;
        } catch (RestNotFoundException e) {
            readCallback.onCompletion(null, e);
            return null;
        }
    }

    public Future commitOffsets(String str, String str2, final CommitCallback commitCallback) {
        try {
            final ConsumerState consumerInstance = getConsumerInstance(str, str2);
            return this.executor.submit(new Runnable() { // from class: io.confluent.kafkarest.ConsumerManager.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        try {
                            commitCallback.onCompletion(consumerInstance.commitOffsets(), null);
                            ConsumerManager.this.updateExpiration(consumerInstance);
                        } catch (Exception e) {
                            ConsumerManager.log.error("Failed to commit offsets for consumer " + consumerInstance.getId().toString(), e);
                            RestServerErrorException restServerErrorException = e;
                            if (!(e instanceof RestException)) {
                                restServerErrorException = Errors.kafkaErrorException(e);
                            }
                            commitCallback.onCompletion(null, restServerErrorException);
                            ConsumerManager.this.updateExpiration(consumerInstance);
                        }
                    } catch (Throwable th) {
                        ConsumerManager.this.updateExpiration(consumerInstance);
                        throw th;
                    }
                }
            });
        } catch (RestNotFoundException e) {
            commitCallback.onCompletion(null, e);
            return null;
        }
    }

    public void deleteConsumer(String str, String str2) {
        log.debug("Destroying consumer " + str2 + " in group " + str);
        getConsumerInstance(str, str2, true).close();
    }

    public void shutdown() {
        log.debug("Shutting down consumers");
        synchronized (this) {
            for (ConsumerWorker consumerWorker : this.workers) {
                log.trace("Shutting down worker " + consumerWorker.toString());
                consumerWorker.shutdown();
            }
            this.workers.clear();
        }
        log.trace("Shutting down consumer expiration thread");
        this.expirationThread.shutdown();
        synchronized (this) {
            Iterator<Map.Entry<ConsumerInstanceId, ConsumerState>> it = this.consumers.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().close();
            }
            this.consumers.clear();
            this.consumersByExpiration.clear();
            this.executor.shutdown();
        }
    }

    private synchronized ConsumerState getConsumerInstance(String str, String str2, boolean z) {
        ConsumerInstanceId consumerInstanceId = new ConsumerInstanceId(str, str2);
        ConsumerState remove = z ? this.consumers.remove(consumerInstanceId) : this.consumers.get(consumerInstanceId);
        if (remove == null) {
            throw Errors.consumerInstanceNotFoundException();
        }
        this.consumersByExpiration.remove(remove);
        return remove;
    }

    private ConsumerState getConsumerInstance(String str, String str2) {
        return getConsumerInstance(str, str2, false);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void updateExpiration(ConsumerState consumerState) {
        consumerState.updateExpiration();
        this.consumersByExpiration.add(consumerState);
        notifyAll();
    }
}
