/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.reader;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader;
import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplitState;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<Tuple3<T, Long, Long>, T, KafkaPartitionSplit, KafkaPartitionSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceReader.class);
    private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit = Collections.synchronizedSortedMap(new TreeMap());
    private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits = new ConcurrentHashMap<TopicPartition, OffsetAndMetadata>();

    public KafkaSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<Tuple3<T, Long, Long>>> elementsQueue, Supplier<KafkaPartitionSplitReader<T>> splitReaderSupplier, RecordEmitter<Tuple3<T, Long, Long>, T, KafkaPartitionSplitState> recordEmitter, Configuration config, SourceReaderContext context) {
        super(elementsQueue, new KafkaSourceFetcherManager<T>(elementsQueue, splitReaderSupplier::get), recordEmitter, config, context);
    }

    protected void onSplitFinished(Map<String, KafkaPartitionSplitState> finishedSplitIds) {
        finishedSplitIds.forEach((ignored, splitState) -> this.offsetsOfFinishedSplits.put(splitState.getTopicPartition(), new OffsetAndMetadata(splitState.getCurrentOffset())));
    }

    public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
        List splits = super.snapshotState(checkpointId);
        if (splits.isEmpty() && this.offsetsOfFinishedSplits.isEmpty()) {
            this.offsetsToCommit.put(checkpointId, Collections.emptyMap());
        } else {
            Map offsetsMap = this.offsetsToCommit.computeIfAbsent(checkpointId, id -> new HashMap());
            for (KafkaPartitionSplit split : splits) {
                offsetsMap.put(split.getTopicPartition(), new OffsetAndMetadata(split.getStartingOffset(), null));
            }
            offsetsMap.putAll(this.offsetsOfFinishedSplits);
        }
        return splits;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.info("Committing offsets for checkpoint {}", (Object)checkpointId);
        ((KafkaSourceFetcherManager)this.splitFetcherManager).commitOffsets((Map)this.offsetsToCommit.get(checkpointId), (ignored, e) -> {
            if (e != null) {
                LOG.warn("Failed to commit consumer offsets for checkpoint {}", (Object)checkpointId, (Object)e);
            } else {
                LOG.debug("Successfully committed offsets for checkpoint {}", (Object)checkpointId);
                Map committedPartitions = (Map)this.offsetsToCommit.get(checkpointId);
                this.offsetsOfFinishedSplits.entrySet().removeIf(entry -> committedPartitions.containsKey(entry.getKey()));
                while (!this.offsetsToCommit.isEmpty() && this.offsetsToCommit.firstKey() <= checkpointId) {
                    this.offsetsToCommit.remove(this.offsetsToCommit.firstKey());
                }
            }
        });
    }

    protected KafkaPartitionSplitState initializedState(KafkaPartitionSplit split) {
        return new KafkaPartitionSplitState(split);
    }

    protected KafkaPartitionSplit toSplitType(String splitId, KafkaPartitionSplitState splitState) {
        return splitState.toKafkaPartitionSplit();
    }

    @VisibleForTesting
    SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> getOffsetsToCommit() {
        return this.offsetsToCommit;
    }

    @VisibleForTesting
    int getNumAliveFetchers() {
        return this.splitFetcherManager.getNumAliveFetchers();
    }
}

