/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.common.TopicAndPartition;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.KillerWatchDog;
import org.apache.flink.streaming.connectors.kafka.internals.PartitionInfoFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.PeriodicOffsetCommitter;
import org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Kafka08Fetcher<T>
extends AbstractFetcher<T, TopicAndPartition> {
    static final KafkaTopicPartitionState<TopicAndPartition> MARKER = new KafkaTopicPartitionState(new KafkaTopicPartition("n/a", -1), (Object)new TopicAndPartition("n/a", -1));
    private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class);
    private final KeyedDeserializationSchema<T> deserializer;
    private final Properties kafkaConfig;
    private final RuntimeContext runtimeContext;
    private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
    private final long invalidOffsetBehavior;
    private final long autoCommitInterval;
    private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
    private volatile boolean running = true;

    public Kafka08Fetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> assignedPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, KeyedDeserializationSchema<T> deserializer, Properties kafkaProperties, long invalidOffsetBehavior, long autoCommitInterval, boolean useMetrics) throws Exception {
        super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, useMetrics);
        this.deserializer = (KeyedDeserializationSchema)Preconditions.checkNotNull(deserializer);
        this.kafkaConfig = (Properties)Preconditions.checkNotNull((Object)kafkaProperties);
        this.runtimeContext = runtimeContext;
        this.invalidOffsetBehavior = invalidOffsetBehavior;
        this.autoCommitInterval = autoCommitInterval;
        this.unassignedPartitionsQueue = new ClosableBlockingQueue();
        for (KafkaTopicPartitionState partition : this.subscribedPartitions()) {
            this.unassignedPartitionsQueue.add((KafkaTopicPartitionState<TopicAndPartition>)partition);
        }
    }

    /*
     * WARNING - void declaration
     */
    public void runFetchLoop() throws Exception {
        ZookeeperOffsetHandler zookeeperOffsetHandler;
        HashMap<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<Node, SimpleConsumerThread<T>>();
        ExceptionProxy errorHandler = new ExceptionProxy(Thread.currentThread());
        this.zookeeperOffsetHandler = zookeeperOffsetHandler = new ZookeeperOffsetHandler(this.kafkaConfig);
        PeriodicOffsetCommitter periodicCommitter = null;
        try {
            void var8_14;
            ArrayList<KafkaTopicPartition> partitionsWithNoOffset = new ArrayList<KafkaTopicPartition>();
            KafkaTopicPartitionState[] kafkaTopicPartitionStateArray = this.subscribedPartitions();
            int n = kafkaTopicPartitionStateArray.length;
            boolean bl = false;
            while (var8_14 < n) {
                KafkaTopicPartitionState partition = kafkaTopicPartitionStateArray[var8_14];
                if (!partition.isOffsetDefined()) {
                    partitionsWithNoOffset.add(partition.getKafkaTopicPartition());
                }
                ++var8_14;
            }
            Map<KafkaTopicPartition, Long> zkOffsets = zookeeperOffsetHandler.getOffsets(partitionsWithNoOffset);
            for (KafkaTopicPartitionState partition : this.subscribedPartitions()) {
                Long offset = zkOffsets.get(partition.getKafkaTopicPartition());
                if (offset == null) continue;
                partition.setOffset(offset.longValue());
            }
            if (this.autoCommitInterval > 0L) {
                LOG.info("Starting periodic offset committer, with commit interval of {}ms", (Object)this.autoCommitInterval);
                periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, this.subscribedPartitions(), errorHandler, this.autoCommitInterval);
                periodicCommitter.setName("Periodic Kafka partition offset committer");
                periodicCommitter.setDaemon(true);
                periodicCommitter.start();
            }
            if (this.useMetrics) {
                MetricGroup kafkaMetricGroup = this.runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
                this.addOffsetStateGauge(kafkaMetricGroup);
            }
            while (this.running) {
                errorHandler.checkAndThrowException();
                List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = this.unassignedPartitionsQueue.getBatchBlocking(5000L);
                partitionsToAssign.remove(MARKER);
                if (!partitionsToAssign.isEmpty()) {
                    LOG.info("Assigning {} partitions to broker threads", (Object)partitionsToAssign.size());
                    Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = Kafka08Fetcher.findLeaderForPartitions(partitionsToAssign, this.kafkaConfig);
                    for (Map.Entry entry : partitionsWithLeaders.entrySet()) {
                        Node leader = (Node)entry.getKey();
                        List partitions = (List)entry.getValue();
                        SimpleConsumerThread<T> brokerThread = (SimpleConsumerThread<T>)brokerToThread.get(leader);
                        if (this.running) {
                            if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) {
                                brokerThread = this.createAndStartSimpleConsumerThread(partitions, leader, errorHandler);
                                brokerToThread.put(leader, brokerThread);
                                continue;
                            }
                            ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue = brokerThread.getNewPartitionsQueue();
                            for (KafkaTopicPartitionState fp : partitions) {
                                if (newPartitionsQueue.addIfOpen((KafkaTopicPartitionState<TopicAndPartition>)fp)) continue;
                                ArrayList<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<KafkaTopicPartitionState<TopicAndPartition>>();
                                seedPartitions.add(fp);
                                brokerThread = this.createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler);
                                brokerToThread.put(leader, brokerThread);
                                newPartitionsQueue = brokerThread.getNewPartitionsQueue();
                            }
                            continue;
                        }
                        break;
                    }
                } else {
                    Iterator bttIterator = brokerToThread.values().iterator();
                    while (bttIterator.hasNext()) {
                        SimpleConsumerThread thread = (SimpleConsumerThread)bttIterator.next();
                        if (thread.getNewPartitionsQueue().isOpen()) continue;
                        LOG.info("Removing stopped consumer thread {}", (Object)thread.getName());
                        bttIterator.remove();
                    }
                }
                if (brokerToThread.size() != 0 || !this.unassignedPartitionsQueue.isEmpty() || !this.unassignedPartitionsQueue.close()) continue;
                LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher");
                break;
            }
        }
        catch (InterruptedException e) {
            errorHandler.checkAndThrowException();
            throw e;
        }
        finally {
            this.running = false;
            this.zookeeperOffsetHandler = null;
            if (periodicCommitter != null) {
                periodicCommitter.shutdown();
            }
            try {
                int runningThreads;
                do {
                    runningThreads = 0;
                    Iterator threads = brokerToThread.values().iterator();
                    while (threads.hasNext()) {
                        SimpleConsumerThread t = (SimpleConsumerThread)threads.next();
                        if (t.isAlive()) {
                            t.cancel();
                            ++runningThreads;
                            continue;
                        }
                        threads.remove();
                    }
                    if (runningThreads <= 0) continue;
                    for (SimpleConsumerThread t : brokerToThread.values()) {
                        t.join(500 / runningThreads + 1);
                    }
                } while (runningThreads > 0);
            }
            catch (Throwable t) {
                LOG.error("Exception while shutting down consumer threads", t);
            }
            try {
                zookeeperOffsetHandler.close();
            }
            catch (Throwable t) {
                LOG.error("Exception while shutting down ZookeeperOffsetHandler", t);
            }
        }
    }

    public void cancel() {
        this.running = false;
        this.unassignedPartitionsQueue.addIfOpen(MARKER);
    }

    public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
        return new TopicAndPartition(partition.getTopic(), partition.getPartition());
    }

    public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
        KafkaTopicPartitionState[] partitions;
        ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
        if (zkHandler != null) {
            zkHandler.writeOffsets(offsets);
        }
        for (KafkaTopicPartitionState partition : partitions = this.subscribedPartitions()) {
            Long offset = offsets.get(partition.getKafkaTopicPartition());
            if (offset == null) continue;
            partition.setCommittedOffset(offset.longValue());
        }
    }

    private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions, Node leader, ExceptionProxy errorHandler) throws IOException, ClassNotFoundException {
        KeyedDeserializationSchema clonedDeserializer = (KeyedDeserializationSchema)InstantiationUtil.clone(this.deserializer, (ClassLoader)this.runtimeContext.getUserCodeClassLoader());
        SimpleConsumerThread brokerThread = new SimpleConsumerThread(this, errorHandler, this.kafkaConfig, leader, seedPartitions, this.unassignedPartitionsQueue, clonedDeserializer, this.invalidOffsetBehavior);
        brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", this.runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port()));
        brokerThread.setDaemon(true);
        brokerThread.start();
        LOG.info("Starting thread {}", (Object)brokerThread.getName());
        return brokerThread;
    }

    private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
        HashSet<String> uniqueTopics = new HashSet<String>();
        for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
            uniqueTopics.add(fp.getTopic());
        }
        return new ArrayList<String>(uniqueTopics);
    }

    private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign, Properties kafkaProperties) throws Exception {
        if (partitionsToAssign.isEmpty()) {
            throw new IllegalArgumentException("Leader request for empty partitions list");
        }
        LOG.info("Refreshing leader information for partitions {}", partitionsToAssign);
        PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(Kafka08Fetcher.getTopics(partitionsToAssign), kafkaProperties);
        infoFetcher.start();
        KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000L);
        watchDog.start();
        List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions();
        ArrayList<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions = new ArrayList<KafkaTopicPartitionState<TopicAndPartition>>(partitionsToAssign);
        HashMap<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> leaderToPartitions = new HashMap<Node, List<KafkaTopicPartitionState<TopicAndPartition>>>();
        block0: for (KafkaTopicPartitionLeader partitionLeader : topicPartitionWithLeaderList) {
            if (unassignedPartitions.size() == 0) break;
            Iterator unassignedPartitionsIterator = unassignedPartitions.iterator();
            while (unassignedPartitionsIterator.hasNext()) {
                KafkaTopicPartitionState unassignedPartition = (KafkaTopicPartitionState)unassignedPartitionsIterator.next();
                if (!unassignedPartition.getKafkaTopicPartition().equals((Object)partitionLeader.getTopicPartition())) continue;
                Node leader = partitionLeader.getLeader();
                ArrayList<KafkaTopicPartitionState> partitionsOfLeader = (ArrayList<KafkaTopicPartitionState>)leaderToPartitions.get(leader);
                if (partitionsOfLeader == null) {
                    partitionsOfLeader = new ArrayList<KafkaTopicPartitionState>();
                    leaderToPartitions.put(leader, partitionsOfLeader);
                }
                partitionsOfLeader.add(unassignedPartition);
                unassignedPartitionsIterator.remove();
                continue block0;
            }
        }
        if (unassignedPartitions.size() > 0) {
            throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions);
        }
        LOG.debug("Partitions with assigned leaders {}", leaderToPartitions);
        return leaderToPartitions;
    }
}

