/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.collections.map.LinkedMap;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlinkKafkaConsumerBase<T>
extends RichParallelSourceFunction<T>
implements CheckpointListener,
CheckpointedAsynchronously<HashMap<KafkaTopicPartition, Long>>,
ResultTypeQueryable<T> {
    private static final long serialVersionUID = -6272159445203409112L;
    protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
    public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    protected final KeyedDeserializationSchema<T> deserializer;
    protected List<KafkaTopicPartition> allSubscribedPartitions;
    private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
    private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
    private final LinkedMap pendingCheckpoints = new LinkedMap();
    private volatile transient AbstractFetcher<T, ?> kafkaFetcher;
    private volatile transient HashMap<KafkaTopicPartition, Long> restoreToOffset;
    private volatile boolean running = true;

    public FlinkKafkaConsumerBase(KeyedDeserializationSchema<T> deserializer) {
        this.deserializer = (KeyedDeserializationSchema)Preconditions.checkNotNull(deserializer, (String)"valueDeserializer");
    }

    protected void setSubscribedPartitions(List<KafkaTopicPartition> allSubscribedPartitions) {
        Preconditions.checkNotNull(allSubscribedPartitions);
        this.allSubscribedPartitions = Collections.unmodifiableList(allSubscribedPartitions);
    }

    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
        Preconditions.checkNotNull(assigner);
        if (this.periodicWatermarkAssigner != null) {
            throw new IllegalStateException("A periodic watermark emitter has already been set.");
        }
        try {
            this.punctuatedWatermarkAssigner = new SerializedValue(assigner);
            return this;
        }
        catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
        Preconditions.checkNotNull(assigner);
        if (this.punctuatedWatermarkAssigner != null) {
            throw new IllegalStateException("A punctuated watermark emitter has already been set.");
        }
        try {
            this.periodicWatermarkAssigner = new SerializedValue(assigner);
            return this;
        }
        catch (Exception e) {
            throw new IllegalArgumentException("The given assigner is not serializable", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.allSubscribedPartitions == null) {
            throw new Exception("The partitions were not set for the consumer");
        }
        List<KafkaTopicPartition> thisSubtaskPartitions = FlinkKafkaConsumerBase.assignPartitions(this.allSubscribedPartitions, this.getRuntimeContext().getNumberOfParallelSubtasks(), this.getRuntimeContext().getIndexOfThisSubtask());
        if (!thisSubtaskPartitions.isEmpty()) {
            AbstractFetcher<T, ?> fetcher = this.createFetcher(sourceContext, thisSubtaskPartitions, this.periodicWatermarkAssigner, this.punctuatedWatermarkAssigner, (StreamingRuntimeContext)this.getRuntimeContext());
            if (this.restoreToOffset != null) {
                fetcher.restoreOffsets(this.restoreToOffset);
            }
            this.kafkaFetcher = fetcher;
            if (!this.running) {
                return;
            }
            fetcher.runFetchLoop();
        } else {
            sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
            Object waitLock = new Object();
            while (this.running) {
                try {
                    Object object = waitLock;
                    synchronized (object) {
                        waitLock.wait();
                    }
                }
                catch (InterruptedException e) {
                    if (this.running) continue;
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    public void cancel() {
        this.running = false;
        if (this.kafkaFetcher != null) {
            this.kafkaFetcher.cancel();
        }
    }

    public void close() throws Exception {
        try {
            this.cancel();
        }
        finally {
            super.close();
        }
    }

    public HashMap<KafkaTopicPartition, Long> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        if (!this.running) {
            LOG.debug("snapshotState() called on closed source");
            return null;
        }
        AbstractFetcher<T, ?> fetcher = this.kafkaFetcher;
        if (fetcher == null) {
            return this.restoreToOffset;
        }
        HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Snapshotting state. Offsets: {}, checkpoint id: {}, timestamp: {}", new Object[]{KafkaTopicPartition.toString(currentOffsets), checkpointId, checkpointTimestamp});
        }
        this.pendingCheckpoints.put((Object)checkpointId, currentOffsets);
        while (this.pendingCheckpoints.size() > 100) {
            this.pendingCheckpoints.remove(0);
        }
        return currentOffsets;
    }

    public void restoreState(HashMap<KafkaTopicPartition, Long> restoredOffsets) {
        LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredOffsets);
        this.restoreToOffset = restoredOffsets;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        block8: {
            if (!this.running) {
                LOG.debug("notifyCheckpointComplete() called on closed source");
                return;
            }
            AbstractFetcher<T, ?> fetcher = this.kafkaFetcher;
            if (fetcher == null) {
                LOG.debug("notifyCheckpointComplete() called on uninitialized source");
                return;
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
            }
            try {
                int posInMap = this.pendingCheckpoints.indexOf((Object)checkpointId);
                if (posInMap == -1) {
                    LOG.warn("Received confirmation for unknown checkpoint id {}", (Object)checkpointId);
                    return;
                }
                HashMap checkpointOffsets = (HashMap)this.pendingCheckpoints.remove(posInMap);
                for (int i = 0; i < posInMap; ++i) {
                    this.pendingCheckpoints.remove(0);
                }
                if (checkpointOffsets == null || checkpointOffsets.size() == 0) {
                    LOG.debug("Checkpoint state was empty.");
                    return;
                }
                fetcher.commitSpecificOffsetsToKafka(checkpointOffsets);
            }
            catch (Exception e) {
                if (!this.running) break block8;
                throw e;
            }
        }
    }

    protected abstract AbstractFetcher<T, ?> createFetcher(SourceFunction.SourceContext<T> var1, List<KafkaTopicPartition> var2, SerializedValue<AssignerWithPeriodicWatermarks<T>> var3, SerializedValue<AssignerWithPunctuatedWatermarks<T>> var4, StreamingRuntimeContext var5) throws Exception;

    public TypeInformation<T> getProducedType() {
        return this.deserializer.getProducedType();
    }

    protected static List<KafkaTopicPartition> assignPartitions(List<KafkaTopicPartition> allPartitions, int numConsumers, int consumerIndex) {
        ArrayList<KafkaTopicPartition> thisSubtaskPartitions = new ArrayList<KafkaTopicPartition>(allPartitions.size() / numConsumers + 1);
        for (int i = 0; i < allPartitions.size(); ++i) {
            if (i % numConsumers != consumerIndex) continue;
            thisSubtaskPartitions.add(allPartitions.get(i));
        }
        return thisSubtaskPartitions;
    }

    protected static void logPartitionInfo(Logger logger, List<KafkaTopicPartition> partitionInfos) {
        HashMap<String, Integer> countPerTopic = new HashMap<String, Integer>();
        for (KafkaTopicPartition partition : partitionInfos) {
            Integer count = (Integer)countPerTopic.get(partition.getTopic());
            if (count == null) {
                count = 1;
            } else {
                Integer n = count;
                Integer n2 = count = Integer.valueOf(count + 1);
            }
            countPerTopic.put(partition.getTopic(), count);
        }
        StringBuilder sb = new StringBuilder("Consumer is going to read the following topics (with number of partitions): ");
        for (Map.Entry e : countPerTopic.entrySet()) {
            sb.append((String)e.getKey()).append(" (").append(e.getValue()).append("), ");
        }
        logger.info(sb.toString());
    }
}

