/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nonnull;
import kafka.api.OffsetRequest;
import kafka.common.TopicAndPartition;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.KillerWatchDog;
import org.apache.flink.streaming.connectors.kafka.internals.PartitionInfoFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.PeriodicOffsetCommitter;
import org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Kafka08Fetcher<T>
extends AbstractFetcher<T, TopicAndPartition> {
    static final KafkaTopicPartitionState<TopicAndPartition> MARKER = new KafkaTopicPartitionState(new KafkaTopicPartition("n/a", -1), (Object)new TopicAndPartition("n/a", -1));
    private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class);
    private final KeyedDeserializationSchema<T> deserializer;
    private final Properties kafkaConfig;
    private final RuntimeContext runtimeContext;
    private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
    private final long invalidOffsetBehavior;
    private final long autoCommitInterval;
    private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
    private volatile boolean running = true;

    public Kafka08Fetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> assignedPartitionsWithInitialOffsets, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, StreamingRuntimeContext runtimeContext, KeyedDeserializationSchema<T> deserializer, Properties kafkaProperties, long autoCommitInterval, boolean useMetrics) throws Exception {
        super(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval(), runtimeContext.getUserCodeClassLoader(), useMetrics);
        this.deserializer = (KeyedDeserializationSchema)Preconditions.checkNotNull(deserializer);
        this.kafkaConfig = (Properties)Preconditions.checkNotNull((Object)kafkaProperties);
        this.runtimeContext = runtimeContext;
        this.invalidOffsetBehavior = Kafka08Fetcher.getInvalidOffsetBehavior(kafkaProperties);
        this.autoCommitInterval = autoCommitInterval;
        this.unassignedPartitionsQueue = new ClosableBlockingQueue();
        for (KafkaTopicPartitionState partition : this.subscribedPartitionStates()) {
            this.unassignedPartitionsQueue.add((KafkaTopicPartitionState<TopicAndPartition>)partition);
        }
    }

    public void runFetchLoop() throws Exception {
        ZookeeperOffsetHandler zookeeperOffsetHandler;
        HashMap<Node, SimpleConsumerThread<T>> brokerToThread = new HashMap<Node, SimpleConsumerThread<T>>();
        ExceptionProxy errorHandler = new ExceptionProxy(Thread.currentThread());
        this.zookeeperOffsetHandler = zookeeperOffsetHandler = new ZookeeperOffsetHandler(this.kafkaConfig);
        PeriodicOffsetCommitter periodicCommitter = null;
        try {
            for (KafkaTopicPartitionState partition : this.subscribedPartitionStates()) {
                if (partition.getOffset() == -915623761775L) {
                    partition.setOffset(OffsetRequest.EarliestTime());
                    continue;
                }
                if (partition.getOffset() == -915623761774L) {
                    partition.setOffset(OffsetRequest.LatestTime());
                    continue;
                }
                if (partition.getOffset() != -915623761773L) continue;
                Long committedOffset = zookeeperOffsetHandler.getCommittedOffset(partition.getKafkaTopicPartition());
                if (committedOffset != null) {
                    partition.setOffset(committedOffset - 1L);
                    continue;
                }
                LOG.warn("No group offset can be found for partition {} in Zookeeper; resetting starting offset to 'auto.offset.reset'", (Object)partition);
                partition.setOffset(this.invalidOffsetBehavior);
            }
            if (this.autoCommitInterval > 0L) {
                LOG.info("Starting periodic offset committer, with commit interval of {}ms", (Object)this.autoCommitInterval);
                periodicCommitter = new PeriodicOffsetCommitter(zookeeperOffsetHandler, this.subscribedPartitionStates(), errorHandler, this.autoCommitInterval);
                periodicCommitter.setName("Periodic Kafka partition offset committer");
                periodicCommitter.setDaemon(true);
                periodicCommitter.start();
            }
            if (this.useMetrics) {
                MetricGroup kafkaMetricGroup = this.runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
                this.addOffsetStateGauge(kafkaMetricGroup);
            }
            while (this.running) {
                errorHandler.checkAndThrowException();
                List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign = this.unassignedPartitionsQueue.getBatchBlocking(5000L);
                partitionsToAssign.remove(MARKER);
                if (!partitionsToAssign.isEmpty()) {
                    LOG.info("Assigning {} partitions to broker threads", (Object)partitionsToAssign.size());
                    Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeaders = Kafka08Fetcher.findLeaderForPartitions(partitionsToAssign, this.kafkaConfig);
                    for (Map.Entry<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> partitionsWithLeader : partitionsWithLeaders.entrySet()) {
                        Node leader = partitionsWithLeader.getKey();
                        List<KafkaTopicPartitionState<TopicAndPartition>> partitions = partitionsWithLeader.getValue();
                        SimpleConsumerThread<T> brokerThread = (SimpleConsumerThread<T>)brokerToThread.get(leader);
                        if (this.running) {
                            if (brokerThread == null || !brokerThread.getNewPartitionsQueue().isOpen()) {
                                brokerThread = this.createAndStartSimpleConsumerThread(partitions, leader, errorHandler);
                                brokerToThread.put(leader, brokerThread);
                                continue;
                            }
                            ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue = brokerThread.getNewPartitionsQueue();
                            for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
                                if (newPartitionsQueue.addIfOpen(fp)) continue;
                                ArrayList<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions = new ArrayList<KafkaTopicPartitionState<TopicAndPartition>>();
                                seedPartitions.add(fp);
                                brokerThread = this.createAndStartSimpleConsumerThread(seedPartitions, leader, errorHandler);
                                brokerToThread.put(leader, brokerThread);
                                newPartitionsQueue = brokerThread.getNewPartitionsQueue();
                            }
                            continue;
                        }
                        break;
                    }
                } else {
                    Iterator bttIterator = brokerToThread.values().iterator();
                    while (bttIterator.hasNext()) {
                        SimpleConsumerThread thread = (SimpleConsumerThread)bttIterator.next();
                        if (thread.getNewPartitionsQueue().isOpen()) continue;
                        LOG.info("Removing stopped consumer thread {}", (Object)thread.getName());
                        bttIterator.remove();
                    }
                }
                if (brokerToThread.size() != 0 || !this.unassignedPartitionsQueue.isEmpty() || !this.unassignedPartitionsQueue.close()) continue;
                LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher");
                break;
            }
        }
        catch (InterruptedException e) {
            errorHandler.checkAndThrowException();
            throw e;
        }
        finally {
            this.running = false;
            this.zookeeperOffsetHandler = null;
            if (periodicCommitter != null) {
                periodicCommitter.shutdown();
            }
            Thread.interrupted();
            try {
                int runningThreads;
                do {
                    runningThreads = 0;
                    Iterator threads = brokerToThread.values().iterator();
                    while (threads.hasNext()) {
                        SimpleConsumerThread t = (SimpleConsumerThread)threads.next();
                        if (t.isAlive()) {
                            t.cancel();
                            ++runningThreads;
                            continue;
                        }
                        threads.remove();
                    }
                    if (runningThreads <= 0) continue;
                    for (SimpleConsumerThread t : brokerToThread.values()) {
                        t.join(500 / runningThreads + 1);
                    }
                } while (runningThreads > 0);
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
            catch (Throwable t) {
                LOG.error("Exception while shutting down consumer threads", t);
            }
            try {
                zookeeperOffsetHandler.close();
            }
            catch (Throwable t) {
                LOG.error("Exception while shutting down ZookeeperOffsetHandler", t);
            }
        }
    }

    public void cancel() {
        this.running = false;
        this.unassignedPartitionsQueue.addIfOpen(MARKER);
    }

    public TopicAndPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
        return new TopicAndPartition(partition.getTopic(), partition.getPartition());
    }

    public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets, @Nonnull KafkaCommitCallback commitCallback) throws Exception {
        KafkaTopicPartitionState[] partitions;
        ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
        if (zkHandler != null) {
            try {
                zkHandler.prepareAndCommitOffsets(offsets);
                commitCallback.onSuccess();
            }
            catch (Exception e) {
                if (this.running) {
                    commitCallback.onException((Throwable)e);
                    throw e;
                }
                return;
            }
        }
        for (KafkaTopicPartitionState partition : partitions = this.subscribedPartitionStates()) {
            Long offset = offsets.get(partition.getKafkaTopicPartition());
            if (offset == null) continue;
            partition.setCommittedOffset(offset.longValue());
        }
    }

    private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions, Node leader, ExceptionProxy errorHandler) throws IOException, ClassNotFoundException {
        KeyedDeserializationSchema clonedDeserializer = (KeyedDeserializationSchema)InstantiationUtil.clone(this.deserializer, (ClassLoader)this.runtimeContext.getUserCodeClassLoader());
        SimpleConsumerThread brokerThread = new SimpleConsumerThread(this, errorHandler, this.kafkaConfig, leader, seedPartitions, this.unassignedPartitionsQueue, clonedDeserializer, this.invalidOffsetBehavior);
        brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", this.runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port()));
        brokerThread.setDaemon(true);
        brokerThread.start();
        LOG.info("Starting thread {}", (Object)brokerThread.getName());
        return brokerThread;
    }

    private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
        HashSet<String> uniqueTopics = new HashSet<String>();
        for (KafkaTopicPartitionState<TopicAndPartition> fp : partitions) {
            uniqueTopics.add(fp.getTopic());
        }
        return new ArrayList<String>(uniqueTopics);
    }

    private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(List<KafkaTopicPartitionState<TopicAndPartition>> partitionsToAssign, Properties kafkaProperties) throws Exception {
        if (partitionsToAssign.isEmpty()) {
            throw new IllegalArgumentException("Leader request for empty partitions list");
        }
        LOG.info("Refreshing leader information for partitions {}", partitionsToAssign);
        PartitionInfoFetcher infoFetcher = new PartitionInfoFetcher(Kafka08Fetcher.getTopics(partitionsToAssign), kafkaProperties);
        infoFetcher.start();
        KillerWatchDog watchDog = new KillerWatchDog(infoFetcher, 60000L);
        watchDog.start();
        List<KafkaTopicPartitionLeader> topicPartitionWithLeaderList = infoFetcher.getPartitions();
        ArrayList<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions = new ArrayList<KafkaTopicPartitionState<TopicAndPartition>>(partitionsToAssign);
        HashMap<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> leaderToPartitions = new HashMap<Node, List<KafkaTopicPartitionState<TopicAndPartition>>>();
        block0: for (KafkaTopicPartitionLeader partitionLeader : topicPartitionWithLeaderList) {
            if (unassignedPartitions.size() == 0) break;
            Iterator unassignedPartitionsIterator = unassignedPartitions.iterator();
            while (unassignedPartitionsIterator.hasNext()) {
                KafkaTopicPartitionState unassignedPartition = (KafkaTopicPartitionState)unassignedPartitionsIterator.next();
                if (!unassignedPartition.getKafkaTopicPartition().equals((Object)partitionLeader.getTopicPartition())) continue;
                Node leader = partitionLeader.getLeader();
                ArrayList<KafkaTopicPartitionState> partitionsOfLeader = (ArrayList<KafkaTopicPartitionState>)leaderToPartitions.get(leader);
                if (partitionsOfLeader == null) {
                    partitionsOfLeader = new ArrayList<KafkaTopicPartitionState>();
                    leaderToPartitions.put(leader, partitionsOfLeader);
                }
                partitionsOfLeader.add(unassignedPartition);
                unassignedPartitionsIterator.remove();
                continue block0;
            }
        }
        if (unassignedPartitions.size() > 0) {
            throw new RuntimeException("Unable to find a leader for partitions: " + unassignedPartitions);
        }
        LOG.debug("Partitions with assigned leaders {}", leaderToPartitions);
        return leaderToPartitions;
    }

    private static long getInvalidOffsetBehavior(Properties config) {
        String val = config.getProperty("auto.offset.reset", "largest");
        if (val.equals("largest") || val.equals("latest")) {
            return OffsetRequest.LatestTime();
        }
        return OffsetRequest.EarliestTime();
    }
}

