/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.util.KafkaUtils;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlinkKafkaConsumerBase<T>
extends RichParallelSourceFunction<T>
implements CheckpointListener,
CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>,
ResultTypeQueryable<T> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
    private static final long serialVersionUID = -6272159445203409112L;
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    protected final KeyedDeserializationSchema<T> deserializer;
    protected final LinkedMap pendingCheckpoints = new LinkedMap();
    protected transient HashMap<KafkaTopicPartition, Long> offsetsState;
    protected transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
    protected volatile boolean running = true;

    public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer, Properties props) {
        this.deserializer = Objects.requireNonNull(deserializer, "valueDeserializer");
    }

    public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        if (this.offsetsState == null) {
            LOG.debug("snapshotState() requested on not yet opened source; returning null.");
            return null;
        }
        if (!this.running) {
            LOG.debug("snapshotState() called on closed source");
            return null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}", new Object[]{KafkaTopicPartition.toString(this.offsetsState), checkpointId, checkpointTimestamp});
        }
        HashMap currentOffsets = (HashMap)this.offsetsState.clone();
        this.pendingCheckpoints.put((Object)checkpointId, (Object)currentOffsets);
        while (this.pendingCheckpoints.size() > 100) {
            this.pendingCheckpoints.remove(0);
        }
        return currentOffsets;
    }

    public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
        LOG.info("Setting restore state in Kafka");
        this.restoreToOffset = restoredOffsets;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        block11: {
            if (this.offsetsState == null) {
                LOG.debug("notifyCheckpointComplete() called on uninitialized source");
                return;
            }
            if (!this.running) {
                LOG.debug("notifyCheckpointComplete() called on closed source");
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Committing offsets externally for checkpoint {}", (Object)checkpointId);
            }
            try {
                HashMap checkpointOffsets;
                LinkedMap linkedMap = this.pendingCheckpoints;
                synchronized (linkedMap) {
                    int posInMap = this.pendingCheckpoints.indexOf((Object)checkpointId);
                    if (posInMap == -1) {
                        LOG.warn("Received confirmation for unknown checkpoint id {}", (Object)checkpointId);
                        return;
                    }
                    checkpointOffsets = (HashMap)this.pendingCheckpoints.remove(posInMap);
                    for (int i = 0; i < posInMap; ++i) {
                        this.pendingCheckpoints.remove(0);
                    }
                }
                if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
                    LOG.debug("Checkpoint state was empty.");
                    return;
                }
                this.commitOffsets(checkpointOffsets);
            }
            catch (Exception e) {
                if (!this.running) break block11;
                throw e;
            }
        }
    }

    protected abstract void commitOffsets(HashMap<KafkaTopicPartition, Long> var1) throws Exception;

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    protected static <T> List<T> assignPartitions(List<T> partitions, int numConsumers, int consumerIndex) {
        KafkaUtils.checkArgument(numConsumers > 0);
        KafkaUtils.checkArgument(consumerIndex < numConsumers);
        ArrayList<T> partitionsToSub = new ArrayList<T>();
        for (int i = 0; i < partitions.size(); ++i) {
            if (i % numConsumers != consumerIndex) continue;
            partitionsToSub.add(partitions.get(i));
        }
        return partitionsToSub;
    }

    public static void logPartitionInfo(List<KafkaTopicPartition> partitionInfos) {
        HashMap<String, Integer> countPerTopic = new HashMap<String, Integer>();
        for (KafkaTopicPartition partition : partitionInfos) {
            Integer count = (Integer)countPerTopic.get(partition.getTopic());
            if (count == null) {
                count = 1;
            } else {
                Integer n = count;
                Integer n2 = count = Integer.valueOf(count + 1);
            }
            countPerTopic.put(partition.getTopic(), count);
        }
        StringBuilder sb = new StringBuilder();
        for (Map.Entry e : countPerTopic.entrySet()) {
            sb.append((String)e.getKey()).append(" (").append(e.getValue()).append("), ");
        }
        LOG.info("Consumer is going to read the following topics (with number of partitions): ", (Object)sb.toString());
    }
}

