/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internal;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;

public class KafkaConsumerThread
extends Thread {
    private final Logger log;
    private final Handover handover;
    private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
    private final Properties kafkaProperties;
    private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
    private final MetricGroup kafkaMetricGroup;
    private final KafkaConsumerCallBridge consumerCallBridge;
    private final long pollTimeout;
    private final boolean useMetrics;
    private volatile KafkaConsumer<byte[], byte[]> consumer;
    private volatile boolean running;
    private volatile boolean commitInProgress;

    public KafkaConsumerThread(Logger log, Handover handover, Properties kafkaProperties, KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions, MetricGroup kafkaMetricGroup, KafkaConsumerCallBridge consumerCallBridge, String threadName, long pollTimeout, boolean useMetrics) {
        super(threadName);
        this.setDaemon(true);
        this.log = (Logger)Preconditions.checkNotNull((Object)log);
        this.handover = (Handover)Preconditions.checkNotNull((Object)handover);
        this.kafkaProperties = (Properties)Preconditions.checkNotNull((Object)kafkaProperties);
        this.subscribedPartitions = (KafkaTopicPartitionState[])Preconditions.checkNotNull(subscribedPartitions);
        this.kafkaMetricGroup = (MetricGroup)Preconditions.checkNotNull((Object)kafkaMetricGroup);
        this.consumerCallBridge = (KafkaConsumerCallBridge)Preconditions.checkNotNull((Object)consumerCallBridge);
        this.pollTimeout = pollTimeout;
        this.useMetrics = useMetrics;
        this.nextOffsetsToCommit = new AtomicReference();
        this.running = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        KafkaConsumer consumer;
        if (!this.running) {
            return;
        }
        Handover handover = this.handover;
        try {
            consumer = new KafkaConsumer(this.kafkaProperties);
        }
        catch (Throwable t) {
            handover.reportError(t);
            return;
        }
        try {
            CommitCallback offsetCommitCallback = new CommitCallback();
            this.consumerCallBridge.assignPartitions(consumer, KafkaConsumerThread.convertKafkaPartitions(this.subscribedPartitions));
            if (this.useMetrics) {
                Map metrics = consumer.metrics();
                if (metrics == null) {
                    this.log.info("Consumer implementation does not support metrics");
                } else {
                    for (Map.Entry metric : metrics.entrySet()) {
                        this.kafkaMetricGroup.gauge(((MetricName)metric.getKey()).name(), (Gauge)new KafkaMetricWrapper((Metric)metric.getValue()));
                    }
                }
            }
            if (!this.running) {
                return;
            }
            for (KafkaTopicPartitionState<TopicPartition> partition : this.subscribedPartitions) {
                if (partition.isOffsetDefined()) {
                    this.log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer to position {}", new Object[]{partition.getKafkaPartitionHandle(), partition.getOffset(), partition.getOffset() + 1L});
                    consumer.seek((TopicPartition)partition.getKafkaPartitionHandle(), partition.getOffset() + 1L);
                    continue;
                }
                long fetchedOffset = consumer.position((TopicPartition)partition.getKafkaPartitionHandle());
                this.log.info("Partition {} has no initial offset; the consumer has position {}, so the initial offset will be set to {}", new Object[]{partition.getKafkaPartitionHandle(), fetchedOffset, fetchedOffset - 1L});
                partition.setOffset(fetchedOffset - 1L);
            }
            this.consumer = consumer;
            ConsumerRecords records = null;
            while (this.running) {
                Map toCommit;
                if (!this.commitInProgress && (toCommit = (Map)this.nextOffsetsToCommit.getAndSet(null)) != null) {
                    this.log.debug("Sending async offset commit request to Kafka broker");
                    this.commitInProgress = true;
                    consumer.commitAsync(toCommit, (OffsetCommitCallback)offsetCommitCallback);
                }
                if (records == null) {
                    try {
                        records = consumer.poll(this.pollTimeout);
                    }
                    catch (WakeupException we) {
                        continue;
                    }
                }
                try {
                    handover.produce(records);
                    records = null;
                }
                catch (Handover.WakeupException wakeupException) {}
            }
        }
        catch (Throwable t) {
            handover.reportError(t);
        }
        finally {
            handover.close();
            try {
                consumer.close();
            }
            catch (Throwable t) {
                this.log.warn("Error while closing Kafka consumer", t);
            }
        }
    }

    public void shutdown() {
        this.running = false;
        this.handover.wakeupProducer();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
        if (this.nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) {
            this.log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.");
        }
        this.handover.wakeupProducer();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] partitions) {
        ArrayList<TopicPartition> result = new ArrayList<TopicPartition>(partitions.length);
        for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
            result.add((TopicPartition)p.getKafkaPartitionHandle());
        }
        return result;
    }

    private class CommitCallback
    implements OffsetCommitCallback {
        private CommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
            KafkaConsumerThread.this.commitInProgress = false;
            if (ex != null) {
                KafkaConsumerThread.this.log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", (Throwable)ex);
            }
        }
    }
}

