package org.apache.samza.system.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.common.TopicAndPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.KafkaConfig;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.apache.samza.util.Clock;
import org.apache.samza.util.KafkaUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

/* loaded from: input_file:org/apache/samza/system/kafka/KafkaSystemConsumer.class */
public class KafkaSystemConsumer<K, V> extends BlockingEnvelopeMap implements SystemConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSystemConsumer.class);
    private static final long FETCH_THRESHOLD = 10000;
    private static final long FETCH_THRESHOLD_BYTES = -1;
    protected final Consumer<K, V> kafkaConsumer;
    protected final String systemName;
    protected final String clientId;
    private final AtomicBoolean stopped;
    private final AtomicBoolean started;
    private final Config config;
    private final boolean fetchThresholdBytesEnabled;
    private final KafkaSystemConsumerMetrics metrics;
    final KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink messageSink;
    private final KafkaConsumerProxy<K, V> proxy;
    Map<TopicPartition, String> topicPartitionsToOffset;
    long perPartitionFetchThreshold;
    long perPartitionFetchThresholdBytes;

    /* loaded from: input_file:org/apache/samza/system/kafka/KafkaSystemConsumer$KafkaConsumerMessageSink.class */
    public class KafkaConsumerMessageSink {
        public KafkaConsumerMessageSink() {
        }

        public void setIsAtHighWatermark(SystemStreamPartition systemStreamPartition, boolean z) {
            KafkaSystemConsumer.this.setIsAtHead(systemStreamPartition, z);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean needsMoreMessages(SystemStreamPartition systemStreamPartition) {
            KafkaSystemConsumer.LOG.debug("{}: needsMoreMessages from following SSP: {}. fetchLimitByBytes enabled={}; messagesSizeInQueue={};(limit={}); messagesNumInQueue={}(limit={};", new Object[]{this, systemStreamPartition, Boolean.valueOf(KafkaSystemConsumer.this.fetchThresholdBytesEnabled), Long.valueOf(KafkaSystemConsumer.this.getMessagesSizeInQueue(systemStreamPartition)), Long.valueOf(KafkaSystemConsumer.this.perPartitionFetchThresholdBytes), Integer.valueOf(KafkaSystemConsumer.this.getNumMessagesInQueue(systemStreamPartition)), Long.valueOf(KafkaSystemConsumer.this.perPartitionFetchThreshold)});
            return KafkaSystemConsumer.this.fetchThresholdBytesEnabled ? KafkaSystemConsumer.this.getMessagesSizeInQueue(systemStreamPartition) < KafkaSystemConsumer.this.perPartitionFetchThresholdBytes : ((long) KafkaSystemConsumer.this.getNumMessagesInQueue(systemStreamPartition)) < KafkaSystemConsumer.this.perPartitionFetchThreshold;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void addMessage(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope incomingMessageEnvelope) {
            KafkaSystemConsumer.LOG.trace("{}: Incoming message ssp = {}: envelope = {}.", new Object[]{this, systemStreamPartition, incomingMessageEnvelope});
            try {
                KafkaSystemConsumer.this.put(systemStreamPartition, incomingMessageEnvelope);
            } catch (InterruptedException e) {
                throw new SamzaException(String.format("%s: Consumer was interrupted while trying to add message with offset %s for ssp %s", this, incomingMessageEnvelope.getOffset(), systemStreamPartition));
            }
        }
    }

    public KafkaSystemConsumer(Consumer<K, V> consumer, String str, Config config, String str2, KafkaConsumerProxyFactory<K, V> kafkaConsumerProxyFactory, KafkaSystemConsumerMetrics kafkaSystemConsumerMetrics, Clock clock) {
        super(kafkaSystemConsumerMetrics.registry(), clock, kafkaSystemConsumerMetrics.getClass().getName());
        this.stopped = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
        this.topicPartitionsToOffset = new HashMap();
        this.kafkaConsumer = consumer;
        this.clientId = str2;
        this.systemName = str;
        this.config = config;
        this.metrics = kafkaSystemConsumerMetrics;
        this.fetchThresholdBytesEnabled = new KafkaConfig(config).isConsumerFetchThresholdBytesEnabled(str);
        this.messageSink = new KafkaConsumerMessageSink();
        this.proxy = kafkaConsumerProxyFactory.create(this);
        LOG.info("{}: Created proxy {} ", this, this.proxy);
    }

    public static <K, V> KafkaConsumer<K, V> createKafkaConsumerImpl(String str, HashMap<String, Object> hashMap) {
        LOG.info("Instantiating KafkaConsumer for systemName {} with properties {}", str, hashMap);
        return new KafkaConsumer<>(hashMap);
    }

    public void start() {
        if (!this.started.compareAndSet(false, true)) {
            LOG.warn("{}: Attempting to start the consumer for the second (or more) time.", this);
            return;
        }
        if (this.stopped.get()) {
            LOG.error("{}: Attempting to start a stopped consumer", this);
            return;
        }
        startSubscription();
        setFetchThresholds();
        startConsumer();
        LOG.info("{}: Consumer started", this);
    }

    private void startSubscription() {
        LOG.info("{}: Consumer subscribes to {}", this, this.topicPartitionsToOffset.keySet());
        try {
            synchronized (this.kafkaConsumer) {
                this.kafkaConsumer.assign(this.topicPartitionsToOffset.keySet());
            }
        } catch (Exception e) {
            throw new SamzaException("Consumer subscription failed for " + this, e);
        }
    }

    void startConsumer() {
        if (this.topicPartitionsToOffset.size() <= 0) {
            LOG.error("{}: Consumer is not subscribed to any SSPs", this);
        }
        this.topicPartitionsToOffset.forEach((topicPartition, str) -> {
            long longValue = Long.valueOf(str).longValue();
            try {
                synchronized (this.kafkaConsumer) {
                    this.kafkaConsumer.seek(topicPartition, longValue);
                }
                LOG.info("{}: Changing consumer's starting offset for partition {} to {}", new Object[]{this, topicPartition, str});
                this.proxy.addTopicPartition(KafkaUtil.toSystemStreamPartition(this.systemName, topicPartition), longValue);
            } catch (Exception e) {
                String format = String.format("%s: Got Exception while seeking to %s for partition %s", this, str, topicPartition);
                LOG.error(format, e);
                throw new SamzaException(format, e);
            }
        });
        if (this.proxy == null || this.proxy.isRunning()) {
            return;
        }
        LOG.info("{}: Starting proxy {}", this, this.proxy);
        this.proxy.start();
    }

    private void setFetchThresholds() {
        KafkaConfig kafkaConfig = new KafkaConfig(this.config);
        Option<String> consumerFetchThreshold = kafkaConfig.getConsumerFetchThreshold(this.systemName);
        long j = 10000;
        if (consumerFetchThreshold.isDefined()) {
            j = Long.valueOf((String) consumerFetchThreshold.get()).longValue();
        }
        Option<String> consumerFetchThresholdBytes = kafkaConfig.getConsumerFetchThresholdBytes(this.systemName);
        long j2 = -1;
        if (consumerFetchThresholdBytes.isDefined()) {
            j2 = Long.valueOf((String) consumerFetchThresholdBytes.get()).longValue();
        }
        int size = this.topicPartitionsToOffset.size();
        if (size > 0) {
            this.perPartitionFetchThreshold = j / size;
            if (this.fetchThresholdBytesEnabled) {
                this.perPartitionFetchThresholdBytes = (j2 / 2) / size;
            }
        }
        LOG.info("{}: fetchThresholdBytes = {}; fetchThreshold={}; numPartitions={}, perPartitionFetchThreshold={}, perPartitionFetchThresholdBytes(0 if disabled)={}", new Object[]{this, Long.valueOf(j2), Long.valueOf(j), Integer.valueOf(size), Long.valueOf(this.perPartitionFetchThreshold), Long.valueOf(this.perPartitionFetchThresholdBytes)});
    }

    public void setFailureCause(Throwable th) {
        super.setFailureCause(th);
    }

    public void stop() {
        if (!this.stopped.compareAndSet(false, true)) {
            LOG.warn("{}: Attempting to stop stopped consumer.", this);
            return;
        }
        LOG.info("{}: Stopping Samza kafkaConsumer ", this);
        if (this.proxy != null) {
            LOG.info("{}: Stopping proxy {}", this, this.proxy);
            this.proxy.stop(TimeUnit.SECONDS.toMillis(60L));
        }
        try {
            synchronized (this.kafkaConsumer) {
                LOG.info("{}: Closing kafkaSystemConsumer {}", this, this.kafkaConsumer);
                this.kafkaConsumer.close();
            }
        } catch (Exception e) {
            LOG.warn("{}: Failed to stop KafkaSystemConsumer.", this, e);
        }
    }

    public void register(SystemStreamPartition systemStreamPartition, String str) {
        if (this.started.get()) {
            throw new SamzaException(String.format("KafkaSystemConsumer: %s had started. Registration of ssp: %s, offset: %s failed.", this, systemStreamPartition, str));
        }
        if (!Objects.equals(systemStreamPartition.getSystem(), this.systemName)) {
            LOG.warn("{}: ignoring SSP {}, because this consumer's system doesn't match.", this, systemStreamPartition);
            return;
        }
        LOG.info("{}: Registering ssp: {} with offset: {}", new Object[]{this, systemStreamPartition, str});
        super.register(systemStreamPartition, str);
        TopicPartition topicPartition = toTopicPartition(systemStreamPartition);
        String str2 = this.topicPartitionsToOffset.get(topicPartition);
        if (str2 == null || compareOffsets(str2, str) > 0) {
            this.topicPartitionsToOffset.put(topicPartition, str);
        }
        this.metrics.registerTopicAndPartition(toTopicAndPartition(topicPartition));
    }

    private static int compareOffsets(String str, String str2) {
        return Long.valueOf(str).compareTo(Long.valueOf(str2));
    }

    public String toString() {
        return String.format("%s:%s", this.systemName, this.clientId);
    }

    public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long j) throws InterruptedException {
        if (this.proxy.isRunning()) {
            return super.poll(set, j);
        }
        LOG.info("{}: KafkaConsumerProxy is not running. Stopping the consumer.", this);
        stop();
        throw new SamzaException(String.format("%s: KafkaConsumerProxy has stopped.", this), this.proxy.getFailureCause());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static TopicAndPartition toTopicAndPartition(TopicPartition topicPartition) {
        return new TopicAndPartition(topicPartition.topic(), topicPartition.partition());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static TopicPartition toTopicPartition(SystemStreamPartition systemStreamPartition) {
        return new TopicPartition(systemStreamPartition.getStream(), systemStreamPartition.getPartition().getPartitionId());
    }

    public String getSystemName() {
        return this.systemName;
    }

    public KafkaSystemConsumer<K, V>.KafkaConsumerMessageSink getMessageSink() {
        return this.messageSink;
    }
}
