/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.source.reader;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.WakeupException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaPartitionSplitReader
implements SplitReader<ConsumerRecord<byte[], byte[]>, KafkaPartitionSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
    private static final long POLL_TIMEOUT = 10000L;
    private final KafkaConsumer<byte[], byte[]> consumer;
    private final Map<TopicPartition, Long> stoppingOffsets;
    private final String groupId;
    private final int subtaskId;
    private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
    private final Set<String> emptySplits = new HashSet<String>();

    public KafkaPartitionSplitReader(Properties props, SourceReaderContext context, KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
        this.subtaskId = context.getIndexOfSubtask();
        this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
        Properties consumerProps = new Properties();
        consumerProps.putAll((Map<?, ?>)props);
        consumerProps.setProperty("client.id", this.createConsumerClientId(props));
        this.consumer = new KafkaConsumer(consumerProps);
        this.stoppingOffsets = new HashMap<TopicPartition, Long>();
        this.groupId = consumerProps.getProperty("group.id");
        this.maybeRegisterKafkaConsumerMetrics(props, kafkaSourceReaderMetrics, this.consumer);
        this.kafkaSourceReaderMetrics.registerNumBytesIn(this.consumer);
    }

    public RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> fetch() throws IOException {
        ConsumerRecords<byte[], byte[]> consumerRecords;
        try {
            consumerRecords = this.consumer.poll(Duration.ofMillis(10000L));
        }
        catch (IllegalStateException | WakeupException e) {
            KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords(ConsumerRecords.empty(), this.kafkaSourceReaderMetrics);
            this.markEmptySplitsAsFinished(recordsBySplits);
            return recordsBySplits;
        }
        KafkaPartitionSplitRecords recordsBySplits = new KafkaPartitionSplitRecords(consumerRecords, this.kafkaSourceReaderMetrics);
        ArrayList<TopicPartition> finishedPartitions = new ArrayList<TopicPartition>();
        for (TopicPartition tp : consumerRecords.partitions()) {
            ConsumerRecord<byte[], byte[]> lastRecord;
            long stoppingOffset = this.getStoppingOffset(tp);
            List<ConsumerRecord<byte[], byte[]>> recordsFromPartition = consumerRecords.records(tp);
            if (recordsFromPartition.size() > 0 && (lastRecord = recordsFromPartition.get(recordsFromPartition.size() - 1)).offset() >= stoppingOffset - 1L) {
                recordsBySplits.setPartitionStoppingOffset(tp, stoppingOffset);
                this.finishSplitAtRecord(tp, stoppingOffset, lastRecord.offset(), finishedPartitions, recordsBySplits);
            }
            this.kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(this.consumer, tp);
        }
        this.markEmptySplitsAsFinished(recordsBySplits);
        if (!finishedPartitions.isEmpty()) {
            finishedPartitions.forEach(this.kafkaSourceReaderMetrics::removeRecordsLagMetric);
            this.unassignPartitions(finishedPartitions);
        }
        this.kafkaSourceReaderMetrics.updateNumBytesInCounter();
        return recordsBySplits;
    }

    private void markEmptySplitsAsFinished(KafkaPartitionSplitRecords recordsBySplits) {
        if (!this.emptySplits.isEmpty()) {
            recordsBySplits.finishedSplits.addAll(this.emptySplits);
            this.emptySplits.clear();
        }
    }

    public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        ArrayList<TopicPartition> newPartitionAssignments = new ArrayList<TopicPartition>();
        HashMap<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new HashMap<TopicPartition, Long>();
        ArrayList<TopicPartition> partitionsStartingFromEarliest = new ArrayList<TopicPartition>();
        ArrayList<TopicPartition> partitionsStartingFromLatest = new ArrayList<TopicPartition>();
        ArrayList<TopicPartition> partitionsStoppingAtLatest = new ArrayList<TopicPartition>();
        HashSet<TopicPartition> partitionsStoppingAtCommitted = new HashSet<TopicPartition>();
        splitsChange.splits().forEach(s -> {
            newPartitionAssignments.add(s.getTopicPartition());
            this.parseStartingOffsets((KafkaPartitionSplit)s, (List<TopicPartition>)partitionsStartingFromEarliest, (List<TopicPartition>)partitionsStartingFromLatest, (Map<TopicPartition, Long>)partitionsStartingFromSpecifiedOffsets);
            this.parseStoppingOffsets((KafkaPartitionSplit)s, (List<TopicPartition>)partitionsStoppingAtLatest, (Set<TopicPartition>)partitionsStoppingAtCommitted);
            this.kafkaSourceReaderMetrics.registerTopicPartition(s.getTopicPartition());
        });
        newPartitionAssignments.addAll(this.consumer.assignment());
        this.consumer.assign(newPartitionAssignments);
        this.seekToStartingOffsets(partitionsStartingFromEarliest, partitionsStartingFromLatest, partitionsStartingFromSpecifiedOffsets);
        this.acquireAndSetStoppingOffsets(partitionsStoppingAtLatest, partitionsStoppingAtCommitted);
        this.removeEmptySplits();
        this.maybeLogSplitChangesHandlingResult(splitsChange);
    }

    public void wakeUp() {
        this.consumer.wakeup();
    }

    public void close() throws Exception {
        this.consumer.close();
    }

    public void notifyCheckpointComplete(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit, OffsetCommitCallback offsetCommitCallback) {
        this.consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
    }

    private void parseStartingOffsets(KafkaPartitionSplit split, List<TopicPartition> partitionsStartingFromEarliest, List<TopicPartition> partitionsStartingFromLatest, Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
        TopicPartition tp = split.getTopicPartition();
        if (split.getStartingOffset() == -2L) {
            partitionsStartingFromEarliest.add(tp);
        } else if (split.getStartingOffset() == -1L) {
            partitionsStartingFromLatest.add(tp);
        } else if (split.getStartingOffset() != -3L) {
            partitionsStartingFromSpecifiedOffsets.put(tp, split.getStartingOffset());
        }
    }

    private void parseStoppingOffsets(KafkaPartitionSplit split, List<TopicPartition> partitionsStoppingAtLatest, Set<TopicPartition> partitionsStoppingAtCommitted) {
        TopicPartition tp = split.getTopicPartition();
        split.getStoppingOffset().ifPresent(stoppingOffset -> {
            if (stoppingOffset >= 0L) {
                this.stoppingOffsets.put(tp, (Long)stoppingOffset);
            } else if (stoppingOffset == -1L) {
                partitionsStoppingAtLatest.add(tp);
            } else if (stoppingOffset == -3L) {
                partitionsStoppingAtCommitted.add(tp);
            } else {
                throw new FlinkRuntimeException(String.format("Invalid stopping offset %d for partition %s", stoppingOffset, tp));
            }
        });
    }

    private void seekToStartingOffsets(List<TopicPartition> partitionsStartingFromEarliest, List<TopicPartition> partitionsStartingFromLatest, Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
        if (!partitionsStartingFromEarliest.isEmpty()) {
            LOG.trace("Seeking starting offsets to beginning: {}", partitionsStartingFromEarliest);
            this.consumer.seekToBeginning(partitionsStartingFromEarliest);
        }
        if (!partitionsStartingFromLatest.isEmpty()) {
            LOG.trace("Seeking starting offsets to end: {}", partitionsStartingFromLatest);
            this.consumer.seekToEnd(partitionsStartingFromLatest);
        }
        if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) {
            LOG.trace("Seeking starting offsets to specified offsets: {}", partitionsStartingFromSpecifiedOffsets);
            partitionsStartingFromSpecifiedOffsets.forEach(this.consumer::seek);
        }
    }

    private void acquireAndSetStoppingOffsets(List<TopicPartition> partitionsStoppingAtLatest, Set<TopicPartition> partitionsStoppingAtCommitted) {
        Map<TopicPartition, Long> endOffset = this.consumer.endOffsets(partitionsStoppingAtLatest);
        this.stoppingOffsets.putAll(endOffset);
        if (!partitionsStoppingAtCommitted.isEmpty()) {
            this.retryOnWakeup(() -> this.consumer.committed(partitionsStoppingAtCommitted), "getting committed offset as stopping offsets").forEach((tp, offsetAndMetadata) -> {
                Preconditions.checkNotNull((Object)offsetAndMetadata, (String)String.format("Partition %s should stop at committed offset. But there is no committed offset of this partition for group %s", tp, this.groupId));
                this.stoppingOffsets.put((TopicPartition)tp, offsetAndMetadata.offset());
            });
        }
    }

    private void removeEmptySplits() {
        ArrayList<TopicPartition> emptyPartitions = new ArrayList<TopicPartition>();
        for (TopicPartition tp : this.consumer.assignment()) {
            if (this.retryOnWakeup(() -> this.consumer.position(tp), "getting starting offset to check if split is empty") < this.getStoppingOffset(tp)) continue;
            emptyPartitions.add(tp);
        }
        if (!emptyPartitions.isEmpty()) {
            LOG.debug("These assigning splits are empty and will be marked as finished in later fetch: {}", emptyPartitions);
            this.emptySplits.addAll(emptyPartitions.stream().map(KafkaPartitionSplit::toSplitId).collect(Collectors.toSet()));
            this.unassignPartitions(emptyPartitions);
        }
    }

    private void maybeLogSplitChangesHandlingResult(SplitsChange<KafkaPartitionSplit> splitsChange) {
        if (LOG.isDebugEnabled()) {
            StringJoiner splitsInfo = new StringJoiner(",");
            for (KafkaPartitionSplit split : splitsChange.splits()) {
                long startingOffset = this.retryOnWakeup(() -> this.consumer.position(split.getTopicPartition()), "logging starting position");
                long stoppingOffset = this.getStoppingOffset(split.getTopicPartition());
                splitsInfo.add(String.format("[%s, start:%d, stop: %d]", split.getTopicPartition(), startingOffset, stoppingOffset));
            }
            LOG.debug("SplitsChange handling result: {}", (Object)splitsInfo);
        }
    }

    private void unassignPartitions(Collection<TopicPartition> partitionsToUnassign) {
        HashSet<TopicPartition> newAssignment = new HashSet<TopicPartition>(this.consumer.assignment());
        newAssignment.removeAll(partitionsToUnassign);
        this.consumer.assign(newAssignment);
    }

    private String createConsumerClientId(Properties props) {
        String prefix = props.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
        return prefix + "-" + this.subtaskId;
    }

    private void finishSplitAtRecord(TopicPartition tp, long stoppingOffset, long currentOffset, List<TopicPartition> finishedPartitions, KafkaPartitionSplitRecords recordsBySplits) {
        LOG.debug("{} has reached stopping offset {}, current offset is {}", new Object[]{tp, stoppingOffset, currentOffset});
        finishedPartitions.add(tp);
        recordsBySplits.addFinishedSplit(KafkaPartitionSplit.toSplitId(tp));
    }

    private long getStoppingOffset(TopicPartition tp) {
        return this.stoppingOffsets.getOrDefault(tp, Long.MAX_VALUE);
    }

    private void maybeRegisterKafkaConsumerMetrics(Properties props, KafkaSourceReaderMetrics kafkaSourceReaderMetrics, KafkaConsumer<?, ?> consumer) {
        Boolean needToRegister = KafkaSourceOptions.getOption(props, KafkaSourceOptions.REGISTER_KAFKA_CONSUMER_METRICS, Boolean::parseBoolean);
        if (needToRegister.booleanValue()) {
            kafkaSourceReaderMetrics.registerKafkaConsumerMetrics(consumer);
        }
    }

    private <V> V retryOnWakeup(Supplier<V> consumerCall, String description) {
        try {
            return consumerCall.get();
        }
        catch (WakeupException we) {
            LOG.info("Caught WakeupException while executing Kafka consumer call for {}. Will retry the consumer call.", (Object)description);
            return consumerCall.get();
        }
    }

    private static class KafkaPartitionSplitRecords
    implements RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>> {
        private final Set<String> finishedSplits = new HashSet<String>();
        private final Map<TopicPartition, Long> stoppingOffsets = new HashMap<TopicPartition, Long>();
        private final ConsumerRecords<byte[], byte[]> consumerRecords;
        private final KafkaSourceReaderMetrics metrics;
        private final Iterator<TopicPartition> splitIterator;
        private Iterator<ConsumerRecord<byte[], byte[]>> recordIterator;
        private TopicPartition currentTopicPartition;
        private Long currentSplitStoppingOffset;

        private KafkaPartitionSplitRecords(ConsumerRecords<byte[], byte[]> consumerRecords, KafkaSourceReaderMetrics metrics) {
            this.consumerRecords = consumerRecords;
            this.splitIterator = consumerRecords.partitions().iterator();
            this.metrics = metrics;
        }

        private void setPartitionStoppingOffset(TopicPartition topicPartition, long stoppingOffset) {
            this.stoppingOffsets.put(topicPartition, stoppingOffset);
        }

        private void addFinishedSplit(String splitId) {
            this.finishedSplits.add(splitId);
        }

        @Nullable
        public String nextSplit() {
            if (this.splitIterator.hasNext()) {
                this.currentTopicPartition = this.splitIterator.next();
                this.recordIterator = this.consumerRecords.records(this.currentTopicPartition).iterator();
                this.currentSplitStoppingOffset = this.stoppingOffsets.getOrDefault(this.currentTopicPartition, Long.MAX_VALUE);
                return this.currentTopicPartition.toString();
            }
            this.currentTopicPartition = null;
            this.recordIterator = null;
            this.currentSplitStoppingOffset = null;
            return null;
        }

        @Nullable
        public ConsumerRecord<byte[], byte[]> nextRecordFromSplit() {
            ConsumerRecord<byte[], byte[]> record;
            Preconditions.checkNotNull((Object)this.currentTopicPartition, (String)"Make sure nextSplit() did not return null before iterate over the records split.");
            if (this.recordIterator.hasNext() && (record = this.recordIterator.next()).offset() < this.currentSplitStoppingOffset) {
                this.metrics.recordCurrentOffset(this.currentTopicPartition, record.offset());
                return record;
            }
            return null;
        }

        public Set<String> finishedSplits() {
            return this.finishedSplits;
        }
    }
}

