package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.class */
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements CheckpointListener, ResultTypeQueryable<T>, CheckpointedFunction, CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {
    private static final long serialVersionUID = -6272159445203409112L;
    protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    private final List<String> topics;
    protected final KeyedDeserializationSchema<T> deserializer;
    protected List<KafkaTopicPartition> subscribedPartitions;
    private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
    private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
    private transient ListState<Tuple2<KafkaTopicPartition, Long>> offsetsStateForCheckpoint;
    private volatile transient AbstractFetcher<T, ?> kafkaFetcher;
    private volatile transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
    private volatile boolean running = true;

    public FlinkKafkaConsumerBase(List<String> list, KeyedDeserializationSchema<T> keyedDeserializationSchema) {
        this.topics = (List) Preconditions.checkNotNull(list);
        Preconditions.checkArgument(list.size() > 0, "You have to define at least one topic.");
        this.deserializer = (KeyedDeserializationSchema) Preconditions.checkNotNull(keyedDeserializationSchema, "valueDeserializer");
    }

    protected void setSubscribedPartitions(List<KafkaTopicPartition> list) {
        Preconditions.checkNotNull(list);
        this.subscribedPartitions = Collections.unmodifiableList(list);
    }

    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assignerWithPunctuatedWatermarks) {
        Preconditions.checkNotNull(assignerWithPunctuatedWatermarks);
        if (this.periodicWatermarkAssigner != null) {
            throw new IllegalStateException("A periodic watermark emitter has already been set.");
        }
        try {
            ClosureCleaner.clean(assignerWithPunctuatedWatermarks, true);
            this.punctuatedWatermarkAssigner = new SerializedValue<>(assignerWithPunctuatedWatermarks);
            return this;
        } catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assignerWithPeriodicWatermarks) {
        Preconditions.checkNotNull(assignerWithPeriodicWatermarks);
        if (this.punctuatedWatermarkAssigner != null) {
            throw new IllegalStateException("A punctuated watermark emitter has already been set.");
        }
        try {
            ClosureCleaner.clean(assignerWithPeriodicWatermarks, true);
            this.periodicWatermarkAssigner = new SerializedValue<>(assignerWithPeriodicWatermarks);
            return this;
        } catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.subscribedPartitions == null) {
            throw new Exception("The partitions were not set for the consumer");
        }
        if (this.subscribedPartitions.isEmpty()) {
            sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
            Object obj = new Object();
            while (this.running) {
                try {
                    synchronized (obj) {
                        obj.wait();
                    }
                } catch (InterruptedException e) {
                    if (!this.running) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
            return;
        }
        AbstractFetcher<T, ?> createFetcher = createFetcher(sourceContext, this.subscribedPartitions, this.periodicWatermarkAssigner, this.punctuatedWatermarkAssigner, getRuntimeContext());
        if (this.restoreToOffset != null) {
            createFetcher.restoreOffsets(this.restoreToOffset);
        }
        this.kafkaFetcher = createFetcher;
        if (this.running) {
            createFetcher.runFetchLoop();
        }
    }

    public void cancel() {
        this.running = false;
        if (this.kafkaFetcher != null) {
            this.kafkaFetcher.cancel();
        }
    }

    public void open(Configuration configuration) {
        assignTopicPartitions();
    }

    public void close() throws Exception {
        try {
            cancel();
            super.close();
        } catch (Throwable th) {
            super.close();
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        this.offsetsStateForCheckpoint = functionInitializationContext.getOperatorStateStore().getSerializableListState("_default_");
        if (!functionInitializationContext.isRestored()) {
            LOG.info("No restore state for FlinkKafkaConsumer.");
            return;
        }
        if (this.restoreToOffset != null) {
            if (this.restoreToOffset.isEmpty()) {
                this.restoreToOffset = null;
            }
        } else {
            this.restoreToOffset = new HashMap<>();
            for (Tuple2 tuple2 : (Iterable) this.offsetsStateForCheckpoint.get()) {
                this.restoreToOffset.put(tuple2.f0, tuple2.f1);
            }
            LOG.info("Setting restore state in the FlinkKafkaConsumer for consumer subtask {}: {}", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.restoreToOffset);
        }
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        if (!this.running) {
            LOG.debug("snapshotState() called on closed source");
            return;
        }
        this.offsetsStateForCheckpoint.clear();
        AbstractFetcher<T, ?> abstractFetcher = this.kafkaFetcher;
        if (abstractFetcher == null) {
            if (this.restoreToOffset != null) {
                for (Map.Entry<KafkaTopicPartition, Long> entry : this.restoreToOffset.entrySet()) {
                    this.offsetsStateForCheckpoint.add(Tuple2.of(entry.getKey(), entry.getValue()));
                }
            } else if (this.subscribedPartitions != null) {
                Iterator<KafkaTopicPartition> it = this.subscribedPartitions.iterator();
                while (it.hasNext()) {
                    this.offsetsStateForCheckpoint.add(Tuple2.of(it.next(), Long.valueOf(KafkaTopicPartitionState.OFFSET_NOT_SET)));
                }
            }
            this.pendingOffsetsToCommit.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), this.restoreToOffset);
        } else {
            HashMap<KafkaTopicPartition, Long> snapshotCurrentState = abstractFetcher.snapshotCurrentState();
            this.pendingOffsetsToCommit.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), snapshotCurrentState);
            for (Map.Entry<KafkaTopicPartition, Long> entry2 : snapshotCurrentState.entrySet()) {
                this.offsetsStateForCheckpoint.add(Tuple2.of(entry2.getKey(), entry2.getValue()));
            }
        }
        while (this.pendingOffsetsToCommit.size() > 100) {
            this.pendingOffsetsToCommit.remove(0);
        }
    }

    public void restoreState(HashMap<KafkaTopicPartition, Long> hashMap) {
        LOG.info("{} (taskIdx={}) restoring offsets from an older version: {}", new Object[]{getClass().getSimpleName(), Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), hashMap});
        this.restoreToOffset = hashMap;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        if (!this.running) {
            LOG.debug("notifyCheckpointComplete() called on closed source");
            return;
        }
        AbstractFetcher<T, ?> abstractFetcher = this.kafkaFetcher;
        if (abstractFetcher == null) {
            LOG.debug("notifyCheckpointComplete() called on uninitialized source");
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + j);
        }
        try {
            int indexOf = this.pendingOffsetsToCommit.indexOf(Long.valueOf(j));
            if (indexOf == -1) {
                LOG.warn("Received confirmation for unknown checkpoint id {}", Long.valueOf(j));
                return;
            }
            HashMap hashMap = (HashMap) this.pendingOffsetsToCommit.remove(indexOf);
            for (int i = 0; i < indexOf; i++) {
                this.pendingOffsetsToCommit.remove(0);
            }
            if (hashMap == null || hashMap.size() == 0) {
                LOG.debug("Checkpoint state was empty.");
            } else {
                abstractFetcher.commitInternalOffsetsToKafka(hashMap);
            }
        } catch (Exception e) {
            if (this.running) {
                throw e;
            }
        }
    }

    protected abstract AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> list, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext) throws Exception;

    protected abstract List<KafkaTopicPartition> getKafkaPartitions(List<String> list);

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    private void assignTopicPartitions() {
        if (this.restoreToOffset != null) {
            this.subscribedPartitions = new ArrayList(this.restoreToOffset.size());
            Iterator<Map.Entry<KafkaTopicPartition, Long>> it = this.restoreToOffset.entrySet().iterator();
            while (it.hasNext()) {
                this.subscribedPartitions.add(it.next().getKey());
            }
            LOG.info("Consumer subtask {} will use the partitions in restored state as subscribed partitions: {}", Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.subscribedPartitions);
            return;
        }
        List<KafkaTopicPartition> kafkaPartitions = getKafkaPartitions(this.topics);
        Collections.sort(kafkaPartitions, new Comparator<KafkaTopicPartition>() { // from class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.1
            @Override // java.util.Comparator
            public int compare(KafkaTopicPartition kafkaTopicPartition, KafkaTopicPartition kafkaTopicPartition2) {
                int compareTo = kafkaTopicPartition.getTopic().compareTo(kafkaTopicPartition2.getTopic());
                return compareTo == 0 ? kafkaTopicPartition.getPartition() - kafkaTopicPartition2.getPartition() : compareTo;
            }
        });
        LOG.info("Fetched a total of {} kafka partitions: {}", Integer.valueOf(kafkaPartitions.size()), kafkaPartitions);
        this.subscribedPartitions = new ArrayList((kafkaPartitions.size() / getRuntimeContext().getNumberOfParallelSubtasks()) + 1);
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        while (true) {
            int i = indexOfThisSubtask;
            if (i >= kafkaPartitions.size()) {
                LOG.info("Consumer subtask {} will subscribe to {} partitions: {}", new Object[]{Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), Integer.valueOf(this.subscribedPartitions.size()), this.subscribedPartitions});
                return;
            } else {
                this.subscribedPartitions.add(kafkaPartitions.get(i));
                indexOfThisSubtask = i + getRuntimeContext().getNumberOfParallelSubtasks();
            }
        }
    }

    protected static List<KafkaTopicPartition> assignPartitions(List<KafkaTopicPartition> list, int i, int i2) {
        ArrayList arrayList = new ArrayList((list.size() / i) + 1);
        for (int i3 = 0; i3 < list.size(); i3++) {
            if (i3 % i == i2) {
                arrayList.add(list.get(i3));
            }
        }
        return arrayList;
    }

    protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> list) {
        HashMap hashMap = new HashMap();
        for (KafkaTopicPartition kafkaTopicPartition : list) {
            Integer num = (Integer) hashMap.get(kafkaTopicPartition.getTopic());
            hashMap.put(kafkaTopicPartition.getTopic(), num == null ? 1 : Integer.valueOf(num.intValue() + 1));
        }
        StringBuilder sb = new StringBuilder("Consumer is going to read the following topics (with number of partitions): ");
        for (Map.Entry entry : hashMap.entrySet()) {
            sb.append((String) entry.getKey()).append(" (").append(entry.getValue()).append("), ");
        }
        logger.info(sb.toString());
    }

    @VisibleForTesting
    List<KafkaTopicPartition> getSubscribedPartitions() {
        return this.subscribedPartitions;
    }
}
