package org.apache.seatunnel.connectors.seatunnel.kafka.source;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.kafka.source.fetch.KafkaSourceFetcherManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.class */
public class KafkaSourceReader extends SingleThreadMultiplexSourceReaderBase<ConsumerRecord<byte[], byte[]>, SeaTunnelRow, KafkaSourceSplit, KafkaSourceSplitState> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSourceReader.class);
    private final SourceReader.Context context;
    private final KafkaSourceConfig kafkaSourceConfig;
    private final SortedMap<Long, Map<TopicPartition, OffsetAndMetadata>> checkpointOffsetMap;
    private final ConcurrentMap<TopicPartition, OffsetAndMetadata> offsetsOfFinishedSplits;

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaSourceReader(BlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>> blockingQueue, SingleThreadFetcherManager<ConsumerRecord<byte[], byte[]>, KafkaSourceSplit> singleThreadFetcherManager, RecordEmitter<ConsumerRecord<byte[], byte[]>, SeaTunnelRow, KafkaSourceSplitState> recordEmitter, SourceReaderOptions sourceReaderOptions, KafkaSourceConfig kafkaSourceConfig, SourceReader.Context context) {
        super(blockingQueue, singleThreadFetcherManager, recordEmitter, sourceReaderOptions, context);
        this.kafkaSourceConfig = kafkaSourceConfig;
        this.context = context;
        this.checkpointOffsetMap = Collections.synchronizedSortedMap(new TreeMap());
        this.offsetsOfFinishedSplits = new ConcurrentHashMap();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase
    protected void onSplitFinished(Map<String, KafkaSourceSplitState> map) {
        map.forEach((str, kafkaSourceSplitState) -> {
            if (kafkaSourceSplitState.getCurrentOffset() > 0) {
                this.offsetsOfFinishedSplits.put(kafkaSourceSplitState.getTopicPartition(), new OffsetAndMetadata(kafkaSourceSplitState.getCurrentOffset()));
            } else if (kafkaSourceSplitState.getEndOffset() > 0) {
                this.offsetsOfFinishedSplits.put(kafkaSourceSplitState.getTopicPartition(), new OffsetAndMetadata(kafkaSourceSplitState.getEndOffset()));
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase
    public KafkaSourceSplitState initializedState(KafkaSourceSplit kafkaSourceSplit) {
        return new KafkaSourceSplitState(kafkaSourceSplit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase
    public KafkaSourceSplit toSplitType(String str, KafkaSourceSplitState kafkaSourceSplitState) {
        return kafkaSourceSplitState.toKafkaSourceSplit();
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase
    public List<KafkaSourceSplit> snapshotState(long j) {
        List<KafkaSourceSplit> snapshotState = super.snapshotState(j);
        if (!this.kafkaSourceConfig.isCommitOnCheckpoint()) {
            return snapshotState;
        }
        if (snapshotState.isEmpty() && this.offsetsOfFinishedSplits.isEmpty()) {
            logger.debug("checkpoint {} does not have an offset to submit for splits", Long.valueOf(j));
            this.checkpointOffsetMap.put(Long.valueOf(j), Collections.emptyMap());
        } else {
            Map<TopicPartition, OffsetAndMetadata> computeIfAbsent = this.checkpointOffsetMap.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashMap();
            });
            for (KafkaSourceSplit kafkaSourceSplit : snapshotState) {
                if (kafkaSourceSplit.getStartOffset() >= 0) {
                    computeIfAbsent.put(kafkaSourceSplit.getTopicPartition(), new OffsetAndMetadata(kafkaSourceSplit.getStartOffset()));
                }
            }
            computeIfAbsent.putAll(this.offsetsOfFinishedSplits);
        }
        return snapshotState;
    }

    public void notifyCheckpointComplete(long j) {
        logger.debug("Committing offsets for checkpoint {}", Long.valueOf(j));
        if (!this.kafkaSourceConfig.isCommitOnCheckpoint()) {
            logger.debug("Submitting offsets after snapshot completion is prohibited");
            return;
        }
        Map<TopicPartition, OffsetAndMetadata> map = this.checkpointOffsetMap.get(Long.valueOf(j));
        if (map == null) {
            logger.debug("Offsets for checkpoint {} have already been committed.", Long.valueOf(j));
        } else if (!map.isEmpty()) {
            ((KafkaSourceFetcherManager) this.splitFetcherManager).commitOffsets(map, (map2, exc) -> {
                if (exc != null) {
                    logger.warn("Failed to commit consumer offsets for checkpoint {}", Long.valueOf(j), exc);
                    return;
                }
                Set<TopicPartition> keySet = this.offsetsOfFinishedSplits.keySet();
                map.getClass();
                keySet.removeIf((v1) -> {
                    return r1.containsKey(v1);
                });
                removeAllOffsetsToCommitUpToCheckpoint(j);
            });
        } else {
            logger.debug("There are no offsets to commit for checkpoint {}.", Long.valueOf(j));
            removeAllOffsetsToCommitUpToCheckpoint(j);
        }
    }

    private void removeAllOffsetsToCommitUpToCheckpoint(long j) {
        while (!this.checkpointOffsetMap.isEmpty() && this.checkpointOffsetMap.firstKey().longValue() <= j) {
            this.checkpointOffsetMap.remove(this.checkpointOffsetMap.firstKey());
        }
    }
}
