package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.api.OffsetRequest;
import kafka.common.TopicAndPartition;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.class */
public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
    static final KafkaTopicPartitionState<TopicAndPartition> MARKER = new KafkaTopicPartitionState<>(new KafkaTopicPartition("n/a", -1), new TopicAndPartition("n/a", -1));
    private static final Logger LOG = LoggerFactory.getLogger(Kafka08Fetcher.class);
    private final KeyedDeserializationSchema<T> deserializer;
    private final Properties kafkaConfig;
    private final RuntimeContext runtimeContext;
    private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitionsQueue;
    private final long invalidOffsetBehavior;
    private final long autoCommitInterval;
    private volatile ZookeeperOffsetHandler zookeeperOffsetHandler;
    private volatile boolean running;

    public Kafka08Fetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<AssignerWithPeriodicWatermarks<T>> serializedValue, SerializedValue<AssignerWithPunctuatedWatermarks<T>> serializedValue2, StreamingRuntimeContext streamingRuntimeContext, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties, long j, boolean z) throws Exception {
        super(sourceContext, map, serializedValue, serializedValue2, streamingRuntimeContext.getProcessingTimeService(), streamingRuntimeContext.getExecutionConfig().getAutoWatermarkInterval(), streamingRuntimeContext.getUserCodeClassLoader(), z);
        this.running = true;
        this.deserializer = (KeyedDeserializationSchema) Preconditions.checkNotNull(keyedDeserializationSchema);
        this.kafkaConfig = (Properties) Preconditions.checkNotNull(properties);
        this.runtimeContext = streamingRuntimeContext;
        this.invalidOffsetBehavior = getInvalidOffsetBehavior(properties);
        this.autoCommitInterval = j;
        this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
        for (KafkaTopicPartitionState<TopicAndPartition> kafkaTopicPartitionState : subscribedPartitionStates()) {
            this.unassignedPartitionsQueue.add(kafkaTopicPartitionState);
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:68:0x02ab, code lost:
    
        org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher");
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void runFetchLoop() throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 1126
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop():void");
    }

    public void cancel() {
        this.running = false;
        this.unassignedPartitionsQueue.addIfOpen(MARKER);
    }

    /* renamed from: createKafkaPartitionHandle, reason: merged with bridge method [inline-methods] */
    public TopicAndPartition m2createKafkaPartitionHandle(KafkaTopicPartition kafkaTopicPartition) {
        return new TopicAndPartition(kafkaTopicPartition.getTopic(), kafkaTopicPartition.getPartition());
    }

    public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> map) throws Exception {
        ZookeeperOffsetHandler zookeeperOffsetHandler = this.zookeeperOffsetHandler;
        if (zookeeperOffsetHandler != null) {
            try {
                zookeeperOffsetHandler.prepareAndCommitOffsets(map);
            } catch (Exception e) {
                if (this.running) {
                    throw e;
                }
                return;
            }
        }
        for (KafkaTopicPartitionState kafkaTopicPartitionState : subscribedPartitionStates()) {
            Long l = map.get(kafkaTopicPartitionState.getKafkaTopicPartition());
            if (l != null) {
                kafkaTopicPartitionState.setCommittedOffset(l.longValue());
            }
        }
    }

    private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(List<KafkaTopicPartitionState<TopicAndPartition>> list, Node node, ExceptionProxy exceptionProxy) throws IOException, ClassNotFoundException {
        SimpleConsumerThread<T> simpleConsumerThread = new SimpleConsumerThread<>(this, exceptionProxy, this.kafkaConfig, node, list, this.unassignedPartitionsQueue, InstantiationUtil.clone(this.deserializer, this.runtimeContext.getUserCodeClassLoader()), this.invalidOffsetBehavior);
        simpleConsumerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", this.runtimeContext.getTaskName(), Integer.valueOf(node.id()), node.host(), Integer.valueOf(node.port())));
        simpleConsumerThread.setDaemon(true);
        simpleConsumerThread.start();
        LOG.info("Starting thread {}", simpleConsumerThread.getName());
        return simpleConsumerThread;
    }

    private static List<String> getTopics(List<KafkaTopicPartitionState<TopicAndPartition>> list) {
        HashSet hashSet = new HashSet();
        Iterator<KafkaTopicPartitionState<TopicAndPartition>> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().getTopic());
        }
        return new ArrayList(hashSet);
    }

    private static Map<Node, List<KafkaTopicPartitionState<TopicAndPartition>>> findLeaderForPartitions(List<KafkaTopicPartitionState<TopicAndPartition>> list, Properties properties) throws Exception {
        if (list.isEmpty()) {
            throw new IllegalArgumentException("Leader request for empty partitions list");
        }
        LOG.info("Refreshing leader information for partitions {}", list);
        PartitionInfoFetcher partitionInfoFetcher = new PartitionInfoFetcher(getTopics(list), properties);
        partitionInfoFetcher.start();
        new KillerWatchDog(partitionInfoFetcher, 60000L).start();
        List<KafkaTopicPartitionLeader> partitions = partitionInfoFetcher.getPartitions();
        ArrayList arrayList = new ArrayList(list);
        HashMap hashMap = new HashMap();
        for (KafkaTopicPartitionLeader kafkaTopicPartitionLeader : partitions) {
            if (arrayList.size() == 0) {
                break;
            }
            Iterator it = arrayList.iterator();
            while (true) {
                if (it.hasNext()) {
                    KafkaTopicPartitionState kafkaTopicPartitionState = (KafkaTopicPartitionState) it.next();
                    if (kafkaTopicPartitionState.getKafkaTopicPartition().equals(kafkaTopicPartitionLeader.getTopicPartition())) {
                        Node leader = kafkaTopicPartitionLeader.getLeader();
                        List list2 = (List) hashMap.get(leader);
                        if (list2 == null) {
                            list2 = new ArrayList();
                            hashMap.put(leader, list2);
                        }
                        list2.add(kafkaTopicPartitionState);
                        it.remove();
                    }
                }
            }
        }
        if (arrayList.size() > 0) {
            throw new RuntimeException("Unable to find a leader for partitions: " + arrayList);
        }
        LOG.debug("Partitions with assigned leaders {}", hashMap);
        return hashMap;
    }

    private static long getInvalidOffsetBehavior(Properties properties) {
        String property = properties.getProperty("auto.offset.reset", "largest");
        return (property.equals("largest") || property.equals("latest")) ? OffsetRequest.LatestTime() : OffsetRequest.EarliestTime();
    }
}
