package io.confluent.kafkarest;

import io.confluent.kafka.common.MessageStreamsExistException;
import io.confluent.kafka.consumer.KafkaStream;
import io.confluent.kafka.javaapi.consumer.ConsumerConnector;
import io.confluent.kafka.message.MessageAndMetadata;
import io.confluent.kafka.serializer.Decoder;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.Vector;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/* loaded from: input_file:io/confluent/kafkarest/ConsumerState.class */
public abstract class ConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> {
    private KafkaRestConfig config;
    private ConsumerInstanceId instanceId;
    private ConsumerConnector consumer;
    volatile long expiration;
    private Map<String, ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> topics = new HashMap();
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    public ConsumerState(KafkaRestConfig kafkaRestConfig, ConsumerInstanceId consumerInstanceId, ConsumerConnector consumerConnector) {
        this.config = kafkaRestConfig;
        this.instanceId = consumerInstanceId;
        this.consumer = consumerConnector;
        this.expiration = kafkaRestConfig.getTime().milliseconds() + kafkaRestConfig.getInt("consumer.instance.timeout.ms").intValue();
    }

    public ConsumerInstanceId getId() {
        return this.instanceId;
    }

    protected abstract Decoder<KafkaKeyT> getKeyDecoder();

    protected abstract Decoder<KafkaValueT> getValueDecoder();

    public abstract ConsumerRecordAndSize<ClientKeyT, ClientValueT> createConsumerRecord(MessageAndMetadata<KafkaKeyT, KafkaValueT> messageAndMetadata);

    public void startRead(ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> consumerTopicState) {
        this.lock.readLock().lock();
        consumerTopicState.lock();
    }

    public void finishRead(ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> consumerTopicState) {
        consumerTopicState.unlock();
        this.lock.readLock().unlock();
    }

    public List<TopicPartitionOffset> commitOffsets() {
        this.lock.writeLock().lock();
        try {
            this.consumer.commitOffsets();
            return getOffsets(true);
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public void close() {
        this.lock.writeLock().lock();
        try {
            this.consumer.shutdown();
            this.consumer = null;
            this.topics = null;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    public boolean expired(long j) {
        return this.expiration <= j;
    }

    public void updateExpiration() {
        this.expiration = this.config.getTime().milliseconds() + this.config.getInt("consumer.instance.timeout.ms").intValue();
    }

    public KafkaRestConfig getConfig() {
        return this.config;
    }

    public void setConfig(KafkaRestConfig kafkaRestConfig) {
        this.config = kafkaRestConfig;
    }

    public ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> getOrCreateTopicState(String str) {
        this.lock.readLock().lock();
        try {
            if (this.topics == null) {
                return null;
            }
            ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> consumerTopicState = this.topics.get(str);
            if (consumerTopicState != null) {
                this.lock.readLock().unlock();
                return consumerTopicState;
            }
            this.lock.readLock().unlock();
            this.lock.writeLock().lock();
            try {
                try {
                    if (this.topics == null) {
                        return null;
                    }
                    ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> consumerTopicState2 = this.topics.get(str);
                    if (consumerTopicState2 != null) {
                        this.lock.writeLock().unlock();
                        return consumerTopicState2;
                    }
                    TreeMap treeMap = new TreeMap();
                    treeMap.put(str, 1);
                    ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> consumerTopicState3 = new ConsumerTopicState<>((KafkaStream) ((List) this.consumer.createMessageStreams(treeMap, getKeyDecoder(), getValueDecoder()).get(str)).get(0));
                    this.topics.put(str, consumerTopicState3);
                    this.lock.writeLock().unlock();
                    return consumerTopicState3;
                } catch (MessageStreamsExistException e) {
                    throw Errors.consumerAlreadySubscribedException();
                }
            } finally {
                this.lock.writeLock().unlock();
            }
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private List<TopicPartitionOffset> getOffsets(boolean z) {
        Long l;
        Vector vector = new Vector();
        for (Map.Entry<String, ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> entry : this.topics.entrySet()) {
            ConsumerTopicState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> value = entry.getValue();
            value.lock();
            try {
                for (Map.Entry<Integer, Long> entry2 : value.getConsumedOffsets().entrySet()) {
                    Integer key = entry2.getKey();
                    Long value2 = entry2.getValue();
                    if (z) {
                        value.getCommittedOffsets().put(key, value2);
                        l = value2;
                    } else {
                        l = value.getCommittedOffsets().get(key);
                    }
                    vector.add(new TopicPartitionOffset(entry.getKey(), key.intValue(), value2.longValue(), l == null ? -1L : l.longValue()));
                }
            } finally {
                value.unlock();
            }
        }
        return vector;
    }
}
