package org.apache.flink.streaming.connectors.kafka.internal;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.class */
public class KafkaConsumerThread extends Thread {
    private final Logger log;
    private final Handover handover;
    private final AtomicReference<Map<TopicPartition, OffsetAndMetadata>> nextOffsetsToCommit;
    private final Properties kafkaProperties;
    private final KafkaTopicPartitionState<TopicPartition>[] subscribedPartitions;
    private final MetricGroup kafkaMetricGroup;
    private final KafkaConsumerCallBridge consumerCallBridge;
    private final long pollTimeout;
    private final boolean useMetrics;
    private volatile KafkaConsumer<byte[], byte[]> consumer;
    private volatile boolean running;
    private volatile boolean commitInProgress;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread$CommitCallback.class */
    private class CommitCallback implements OffsetCommitCallback {
        private CommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            KafkaConsumerThread.this.commitInProgress = false;
            if (exc != null) {
                KafkaConsumerThread.this.log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", exc);
            }
        }
    }

    public KafkaConsumerThread(Logger logger, Handover handover, Properties properties, KafkaTopicPartitionState<TopicPartition>[] kafkaTopicPartitionStateArr, MetricGroup metricGroup, KafkaConsumerCallBridge kafkaConsumerCallBridge, String str, long j, boolean z) {
        super(str);
        setDaemon(true);
        this.log = (Logger) Preconditions.checkNotNull(logger);
        this.handover = (Handover) Preconditions.checkNotNull(handover);
        this.kafkaProperties = (Properties) Preconditions.checkNotNull(properties);
        this.subscribedPartitions = (KafkaTopicPartitionState[]) Preconditions.checkNotNull(kafkaTopicPartitionStateArr);
        this.kafkaMetricGroup = (MetricGroup) Preconditions.checkNotNull(metricGroup);
        this.consumerCallBridge = (KafkaConsumerCallBridge) Preconditions.checkNotNull(kafkaConsumerCallBridge);
        this.pollTimeout = j;
        this.useMetrics = z;
        this.nextOffsetsToCommit = new AtomicReference<>();
        this.running = true;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Map<TopicPartition, OffsetAndMetadata> andSet;
        if (this.running) {
            Handover handover = this.handover;
            try {
                KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<>(this.kafkaProperties);
                try {
                    try {
                        CommitCallback commitCallback = new CommitCallback();
                        this.consumerCallBridge.assignPartitions(kafkaConsumer, convertKafkaPartitions(this.subscribedPartitions));
                        if (this.useMetrics) {
                            Map metrics = kafkaConsumer.metrics();
                            if (metrics == null) {
                                this.log.info("Consumer implementation does not support metrics");
                            } else {
                                for (Map.Entry entry : metrics.entrySet()) {
                                    this.kafkaMetricGroup.gauge(((MetricName) entry.getKey()).name(), new KafkaMetricWrapper((Metric) entry.getValue()));
                                }
                            }
                        }
                        if (!this.running) {
                            handover.close();
                            try {
                                kafkaConsumer.close();
                                return;
                            } catch (Throwable th) {
                                this.log.warn("Error while closing Kafka consumer", th);
                                return;
                            }
                        }
                        for (KafkaTopicPartitionState<TopicPartition> kafkaTopicPartitionState : this.subscribedPartitions) {
                            if (kafkaTopicPartitionState.isOffsetDefined()) {
                                this.log.info("Partition {} has restored initial offsets {} from checkpoint / savepoint; seeking the consumer to position {}", new Object[]{kafkaTopicPartitionState.getKafkaPartitionHandle(), Long.valueOf(kafkaTopicPartitionState.getOffset()), Long.valueOf(kafkaTopicPartitionState.getOffset() + 1)});
                                kafkaConsumer.seek((TopicPartition) kafkaTopicPartitionState.getKafkaPartitionHandle(), kafkaTopicPartitionState.getOffset() + 1);
                            } else {
                                long position = kafkaConsumer.position((TopicPartition) kafkaTopicPartitionState.getKafkaPartitionHandle());
                                this.log.info("Partition {} has no initial offset; the consumer has position {}, so the initial offset will be set to {}", new Object[]{kafkaTopicPartitionState.getKafkaPartitionHandle(), Long.valueOf(position), Long.valueOf(position - 1)});
                                kafkaTopicPartitionState.setOffset(position - 1);
                            }
                        }
                        this.consumer = kafkaConsumer;
                        ConsumerRecords<byte[], byte[]> consumerRecords = null;
                        while (this.running) {
                            if (!this.commitInProgress && (andSet = this.nextOffsetsToCommit.getAndSet(null)) != null) {
                                this.log.debug("Sending async offset commit request to Kafka broker");
                                this.commitInProgress = true;
                                kafkaConsumer.commitAsync(andSet, commitCallback);
                            }
                            if (consumerRecords == null) {
                                try {
                                    consumerRecords = kafkaConsumer.poll(this.pollTimeout);
                                } catch (WakeupException e) {
                                }
                            }
                            try {
                                handover.produce(consumerRecords);
                                consumerRecords = null;
                            } catch (Handover.WakeupException e2) {
                            }
                        }
                    } catch (Throwable th2) {
                        handover.reportError(th2);
                        handover.close();
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th3) {
                            this.log.warn("Error while closing Kafka consumer", th3);
                        }
                    }
                } finally {
                    handover.close();
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        this.log.warn("Error while closing Kafka consumer", th4);
                    }
                }
            } catch (Throwable th5) {
                handover.reportError(th5);
            }
        }
    }

    public void shutdown() {
        this.running = false;
        this.handover.wakeupProducer();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        if (this.nextOffsetsToCommit.getAndSet(map) != null) {
            this.log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.");
        }
        this.handover.wakeupProducer();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
    }

    private static List<TopicPartition> convertKafkaPartitions(KafkaTopicPartitionState<TopicPartition>[] kafkaTopicPartitionStateArr) {
        ArrayList arrayList = new ArrayList(kafkaTopicPartitionStateArr.length);
        for (KafkaTopicPartitionState<TopicPartition> kafkaTopicPartitionState : kafkaTopicPartitionStateArr) {
            arrayList.add(kafkaTopicPartitionState.getKafkaPartitionHandle());
        }
        return arrayList;
    }
}
