package org.apache.flink.connector.kafka.source.reader;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connector/kafka/source/reader/KafkaSourceReader.class */
public class KafkaSourceReader<T> extends SingleThreadMultiplexSourceReaderBase<Tuple3<T, Long, Long>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReader.class);
    private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit;
    private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits;
    private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
    private final boolean commitOffsetsOnCheckpoint;

    public KafkaSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> futureCompletingBlockingQueue, KafkaSourceFetcherManager<T> kafkaSourceFetcherManager, RecordEmitter<Tuple3<T, Long, Long>, T, KafkaPartitionSplitState> recordEmitter, Configuration configuration, SourceReaderContext sourceReaderContext, KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
        super(futureCompletingBlockingQueue, kafkaSourceFetcherManager, recordEmitter, configuration, sourceReaderContext);
        this.offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.offsetsOfFinishedSplits = new ConcurrentHashMap();
        this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
        this.commitOffsetsOnCheckpoint = ((Boolean) configuration.get(KafkaSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT)).booleanValue();
        if (this.commitOffsetsOnCheckpoint) {
            return;
        }
        LOG.warn("Offset commit on checkpoint is disabled. Consuming offset will not be reported back to Kafka cluster.");
    }

    protected void onSplitFinished(Map<String, KafkaPartitionSplitState> map) {
        map.forEach((str, kafkaPartitionSplitState) -> {
            if (kafkaPartitionSplitState.getCurrentOffset() >= 0) {
                this.offsetsOfFinishedSplits.put(kafkaPartitionSplitState.getTopicPartition(), new OffsetAndMetadata(kafkaPartitionSplitState.getCurrentOffset()));
            }
        });
    }

    public List<KafkaPartitionSplit> snapshotState(long j) {
        List<KafkaPartitionSplit> snapshotState = super.snapshotState(j);
        if (!this.commitOffsetsOnCheckpoint) {
            return snapshotState;
        }
        if (snapshotState.isEmpty() && this.offsetsOfFinishedSplits.isEmpty()) {
            this.offsetsToCommit.put(Long.valueOf(j), Collections.emptyMap());
        } else {
            Map<TopicPartition, OffsetAndMetadata> computeIfAbsent = this.offsetsToCommit.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashMap();
            });
            for (KafkaPartitionSplit kafkaPartitionSplit : snapshotState) {
                if (kafkaPartitionSplit.getStartingOffset() >= 0) {
                    computeIfAbsent.put(kafkaPartitionSplit.getTopicPartition(), new OffsetAndMetadata(kafkaPartitionSplit.getStartingOffset()));
                }
            }
            computeIfAbsent.putAll(this.offsetsOfFinishedSplits);
        }
        return snapshotState;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        LOG.debug("Committing offsets for checkpoint {}", Long.valueOf(j));
        if (this.commitOffsetsOnCheckpoint) {
            this.splitFetcherManager.commitOffsets(this.offsetsToCommit.get(Long.valueOf(j)), (map, exc) -> {
                if (exc != null) {
                    this.kafkaSourceReaderMetrics.recordFailedCommit();
                    LOG.warn("Failed to commit consumer offsets for checkpoint {}", Long.valueOf(j), exc);
                    return;
                }
                LOG.debug("Successfully committed offsets for checkpoint {}", Long.valueOf(j));
                this.kafkaSourceReaderMetrics.recordSucceededCommit();
                Map<TopicPartition, OffsetAndMetadata> map = this.offsetsToCommit.get(Long.valueOf(j));
                map.forEach((topicPartition, offsetAndMetadata) -> {
                    this.kafkaSourceReaderMetrics.recordCommittedOffset(topicPartition, offsetAndMetadata.offset());
                });
                this.offsetsOfFinishedSplits.entrySet().removeIf(entry -> {
                    return map.containsKey(entry.getKey());
                });
                while (!this.offsetsToCommit.isEmpty() && this.offsetsToCommit.firstKey().longValue() <= j) {
                    this.offsetsToCommit.remove(this.offsetsToCommit.firstKey());
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaPartitionSplitState initializedState(KafkaPartitionSplit kafkaPartitionSplit) {
        return new KafkaPartitionSplitState(kafkaPartitionSplit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public KafkaPartitionSplit toSplitType(String str, KafkaPartitionSplitState kafkaPartitionSplitState) {
        return kafkaPartitionSplitState.toKafkaPartitionSplit();
    }

    @VisibleForTesting
    SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> getOffsetsToCommit() {
        return this.offsetsToCommit;
    }

    @VisibleForTesting
    int getNumAliveFetchers() {
        return this.splitFetcherManager.getNumAliveFetchers();
    }
}
