package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.util.KafkaUtils;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.class */
public class LegacyFetcher implements Fetcher {
    private static final Logger LOG = LoggerFactory.getLogger(LegacyFetcher.class);
    private static final FetchPartition MARKER = new FetchPartition("n/a", -1, -1);
    private final Properties config;
    private final String taskName;
    private final AtomicReference<Throwable> error;
    private final ClassLoader userCodeClassloader;
    private volatile Thread mainThread;
    private volatile boolean running = true;
    private final ClosableBlockingQueue<FetchPartition> unassignedPartitions = new ClosableBlockingQueue<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher$FetchPartition.class */
    public static class FetchPartition {
        final String topic;
        final int partition;
        long nextOffsetToRead;

        FetchPartition(String str, int i, long j) {
            this.topic = str;
            this.partition = i;
            this.nextOffsetToRead = j;
        }

        public String toString() {
            return "FetchPartition {topic=" + this.topic + ", partition=" + this.partition + ", offset=" + this.nextOffsetToRead + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher$KillerWatchDog.class */
    public static class KillerWatchDog extends Thread {
        private final Thread toKill;
        private final long timeout;

        private KillerWatchDog(Thread thread, long j) {
            super("KillerWatchDog");
            setDaemon(true);
            this.toKill = thread;
            this.timeout = j;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis() + this.timeout;
            while (this.toKill.isAlive()) {
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 >= currentTimeMillis) {
                    break;
                } else {
                    try {
                        this.toKill.join(currentTimeMillis - currentTimeMillis2);
                    } catch (InterruptedException e) {
                    }
                }
            }
            if (this.toKill.isAlive()) {
                this.toKill.stop();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher$PartitionInfoFetcher.class */
    public static class PartitionInfoFetcher extends Thread {
        private final List<String> topics;
        private final Properties properties;
        private volatile List<KafkaTopicPartitionLeader> result;
        private volatile Throwable error;

        PartitionInfoFetcher(List<String> list, Properties properties) {
            this.topics = list;
            this.properties = properties;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.result = FlinkKafkaConsumer08.getPartitionsForTopic(this.topics, this.properties);
            } catch (Throwable th) {
                this.error = th;
            }
        }

        public List<KafkaTopicPartitionLeader> getPartitions() throws Exception {
            try {
                join();
                if (this.error != null) {
                    throw new Exception("Failed to fetch partitions for topics " + this.topics.toString(), this.error);
                }
                if (this.result != null) {
                    return this.result;
                }
                throw new Exception("Partition fetching failed");
            } catch (InterruptedException e) {
                throw new Exception("Partition fetching was cancelled before completion");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher$SimpleConsumerThread.class */
    public static class SimpleConsumerThread<T> extends Thread {
        private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class);
        private final SourceFunction.SourceContext<T> sourceContext;
        private final KeyedDeserializationSchema<T> deserializer;
        private final HashMap<KafkaTopicPartition, Long> offsetsState;
        private final List<FetchPartition> partitions;
        private final Node broker;
        private final Properties config;
        private final LegacyFetcher owner;
        private final ClosableBlockingQueue<FetchPartition> unassignedPartitions;
        private volatile boolean running = true;
        private final ClosableBlockingQueue<FetchPartition> newPartitionsQueue = new ClosableBlockingQueue<>();
        private SimpleConsumer consumer;
        private final int soTimeout;
        private final int minBytes;
        private final int maxWait;
        private final int fetchSize;
        private final int bufferSize;
        private final int reconnectLimit;

        public SimpleConsumerThread(LegacyFetcher legacyFetcher, Properties properties, Node node, List<FetchPartition> list, ClosableBlockingQueue<FetchPartition> closableBlockingQueue, SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> keyedDeserializationSchema, HashMap<KafkaTopicPartition, Long> hashMap) {
            this.owner = legacyFetcher;
            this.config = properties;
            this.broker = node;
            this.partitions = list;
            this.sourceContext = (SourceFunction.SourceContext) Objects.requireNonNull(sourceContext);
            this.deserializer = (KeyedDeserializationSchema) Objects.requireNonNull(keyedDeserializationSchema);
            this.offsetsState = (HashMap) Objects.requireNonNull(hashMap);
            this.unassignedPartitions = (ClosableBlockingQueue) Objects.requireNonNull(closableBlockingQueue);
            this.soTimeout = KafkaUtils.getIntFromConfig(properties, "socket.timeout.ms", 30000);
            this.minBytes = KafkaUtils.getIntFromConfig(properties, "fetch.min.bytes", 1);
            this.maxWait = KafkaUtils.getIntFromConfig(properties, "fetch.wait.max.ms", 100);
            this.fetchSize = KafkaUtils.getIntFromConfig(properties, "fetch.message.max.bytes", 1048576);
            this.bufferSize = KafkaUtils.getIntFromConfig(properties, "socket.receive.buffer.bytes", 65536);
            this.reconnectLimit = KafkaUtils.getIntFromConfig(properties, "flink.simple-consumer-reconnectLimit", 3);
        }

        /* JADX WARN: Code restructure failed: missing block: B:178:0x0604, code lost:
        
            if (r9.newPartitionsQueue.close() != false) goto L129;
         */
        /* JADX WARN: Code restructure failed: missing block: B:180:0x0611, code lost:
        
            throw new java.lang.Exception("Bug: Cleanly leaving fetcher thread without having a closed queue.");
         */
        /* JADX WARN: Code restructure failed: missing block: B:183:0x0616, code lost:
        
            if (r9.consumer == null) goto L151;
         */
        /* JADX WARN: Code restructure failed: missing block: B:184:0x0682, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:186:0x0619, code lost:
        
            r9.consumer.close();
         */
        /* JADX WARN: Code restructure failed: missing block: B:188:?, code lost:
        
            return;
         */
        /* JADX WARN: Code restructure failed: missing block: B:189:0x0623, code lost:
        
            r12 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:190:0x0624, code lost:
        
            org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.SimpleConsumerThread.LOG.error("Error while closing the Kafka simple consumer", r12);
         */
        /* JADX WARN: Code restructure failed: missing block: B:191:?, code lost:
        
            return;
         */
        @Override // java.lang.Thread, java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 1667
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.connectors.kafka.internals.LegacyFetcher.SimpleConsumerThread.run():void");
        }

        private void getMissingOffsetsFromKafka(List<FetchPartition> list) {
            ArrayList<FetchPartition> arrayList = new ArrayList();
            for (FetchPartition fetchPartition : list) {
                if (fetchPartition.nextOffsetToRead == FlinkKafkaConsumer08.OFFSET_NOT_SET) {
                    arrayList.add(fetchPartition);
                }
            }
            if (arrayList.size() > 0) {
                getLastOffset(this.consumer, arrayList, getInvalidOffsetBehavior(this.config));
                LOG.info("No prior offsets found for some partitions. Fetched the following start offsets {}", arrayList);
                synchronized (this.sourceContext.getCheckpointLock()) {
                    for (FetchPartition fetchPartition2 : arrayList) {
                        this.offsetsState.put(new KafkaTopicPartition(fetchPartition2.topic, fetchPartition2.partition), Long.valueOf(fetchPartition2.nextOffsetToRead - 1));
                    }
                }
            }
        }

        public void cancel() {
            this.running = false;
            if (this.consumer != null) {
                this.consumer.close();
            }
            interrupt();
        }

        public ClosableBlockingQueue<FetchPartition> getNewPartitionsQueue() {
            return this.newPartitionsQueue;
        }

        private static void getLastOffset(SimpleConsumer simpleConsumer, List<FetchPartition> list, long j) {
            HashMap hashMap = new HashMap();
            for (FetchPartition fetchPartition : list) {
                hashMap.put(new TopicAndPartition(fetchPartition.topic, fetchPartition.partition), new PartitionOffsetRequestInfo(j, 1));
            }
            OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new OffsetRequest(hashMap, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()));
            if (!offsetsBefore.hasError()) {
                for (FetchPartition fetchPartition2 : list) {
                    fetchPartition2.nextOffsetToRead = offsetsBefore.offsets(fetchPartition2.topic, fetchPartition2.partition)[0];
                }
                return;
            }
            String str = "";
            for (FetchPartition fetchPartition3 : list) {
                short errorCode = offsetsBefore.errorCode(fetchPartition3.topic, fetchPartition3.partition);
                if (errorCode != ErrorMapping.NoError()) {
                    str = str + "\nException for partition " + fetchPartition3.partition + ": " + StringUtils.stringifyException(ErrorMapping.exceptionFor(errorCode));
                }
            }
            throw new RuntimeException("Unable to get last offset for partitions " + list + ". " + str);
        }

        private static long getInvalidOffsetBehavior(Properties properties) {
            String property = properties.getProperty("auto.offset.reset", "largest");
            if (property.equals("none")) {
                throw new RuntimeException("Unable to find previous offset in consumer group. Set 'auto.offset.reset' to 'latest' or 'earliest' to automatically get the offset from Kafka");
            }
            return (property.equals("largest") || property.equals("latest")) ? kafka.api.OffsetRequest.LatestTime() : kafka.api.OffsetRequest.EarliestTime();
        }
    }

    public LegacyFetcher(Map<KafkaTopicPartition, Long> map, Properties properties, String str, ClassLoader classLoader) {
        this.config = (Properties) Objects.requireNonNull(properties, "The config properties cannot be null");
        this.userCodeClassloader = (ClassLoader) Objects.requireNonNull(classLoader);
        if (map.size() == 0) {
            throw new IllegalArgumentException("List of initial partitions is empty");
        }
        for (Map.Entry<KafkaTopicPartition, Long> entry : map.entrySet()) {
            KafkaTopicPartition key = entry.getKey();
            long longValue = entry.getValue().longValue();
            if (longValue >= 0 && longValue != FlinkKafkaConsumer08.OFFSET_NOT_SET) {
                longValue++;
            }
            this.unassignedPartitions.add(new FetchPartition(key.getTopic(), key.getPartition(), longValue));
        }
        this.taskName = str;
        this.error = new AtomicReference<>();
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.Fetcher
    public void close() {
        this.running = false;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.Fetcher
    public <T> void run(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> keyedDeserializationSchema, HashMap<KafkaTopicPartition, Long> hashMap) throws Exception {
        int i;
        this.mainThread = Thread.currentThread();
        HashMap hashMap2 = new HashMap();
        while (this.running && this.error.get() == null) {
            try {
                try {
                    List<FetchPartition> batchBlocking = this.unassignedPartitions.getBatchBlocking(5000L);
                    batchBlocking.remove(MARKER);
                    if (batchBlocking.isEmpty()) {
                        Iterator it = hashMap2.values().iterator();
                        while (it.hasNext()) {
                            SimpleConsumerThread simpleConsumerThread = (SimpleConsumerThread) it.next();
                            if (!simpleConsumerThread.getNewPartitionsQueue().isOpen()) {
                                LOG.info("Removing stopped consumer thread {}", simpleConsumerThread.getName());
                                it.remove();
                            }
                        }
                    } else {
                        LOG.info("Assigning {} partitions to broker threads", Integer.valueOf(batchBlocking.size()));
                        for (Map.Entry<Node, List<FetchPartition>> entry : findLeaderForPartitions(batchBlocking).entrySet()) {
                            Node key = entry.getKey();
                            List<FetchPartition> value = entry.getValue();
                            SimpleConsumerThread simpleConsumerThread2 = (SimpleConsumerThread) hashMap2.get(key);
                            if (!this.running) {
                                break;
                            }
                            if (simpleConsumerThread2 == null || !simpleConsumerThread2.getNewPartitionsQueue().isOpen()) {
                                hashMap2.put(key, createAndStartSimpleConsumerThread(sourceContext, keyedDeserializationSchema, hashMap, value, key));
                            } else {
                                ClosableBlockingQueue<FetchPartition> newPartitionsQueue = simpleConsumerThread2.getNewPartitionsQueue();
                                for (FetchPartition fetchPartition : value) {
                                    if (!newPartitionsQueue.addIfOpen(fetchPartition)) {
                                        ArrayList arrayList = new ArrayList();
                                        arrayList.add(fetchPartition);
                                        SimpleConsumerThread<T> createAndStartSimpleConsumerThread = createAndStartSimpleConsumerThread(sourceContext, keyedDeserializationSchema, hashMap, arrayList, key);
                                        hashMap2.put(key, createAndStartSimpleConsumerThread);
                                        newPartitionsQueue = createAndStartSimpleConsumerThread.getNewPartitionsQueue();
                                    }
                                }
                            }
                        }
                    }
                } catch (InterruptedException e) {
                }
                if (hashMap2.size() == 0 && this.unassignedPartitions.isEmpty() && this.unassignedPartitions.close()) {
                    LOG.info("All consumer threads are finished, there are no more unassigned partitions. Stopping fetcher");
                    break;
                }
            } finally {
                do {
                    i = 0;
                    for (SimpleConsumerThread simpleConsumerThread3 : hashMap2.values()) {
                        if (simpleConsumerThread3.isAlive()) {
                            simpleConsumerThread3.cancel();
                            i++;
                        }
                    }
                    if (i > 0) {
                        Thread.sleep(500L);
                    }
                } while (i > 0);
            }
        }
        Throwable th = this.error.get();
        if (th != null) {
            throw new Exception(th.getMessage(), th);
        }
    }

    private <T> SimpleConsumerThread<T> createAndStartSimpleConsumerThread(SourceFunction.SourceContext<T> sourceContext, KeyedDeserializationSchema<T> keyedDeserializationSchema, HashMap<KafkaTopicPartition, Long> hashMap, List<FetchPartition> list, Node node) throws IOException, ClassNotFoundException {
        SimpleConsumerThread<T> simpleConsumerThread = new SimpleConsumerThread<>(this, this.config, node, list, this.unassignedPartitions, sourceContext, InstantiationUtil.clone(keyedDeserializationSchema, this.userCodeClassloader), hashMap);
        simpleConsumerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", this.taskName, Integer.valueOf(node.id()), node.host(), Integer.valueOf(node.port())));
        simpleConsumerThread.setDaemon(true);
        simpleConsumerThread.start();
        LOG.info("Starting thread {}", simpleConsumerThread.getName());
        return simpleConsumerThread;
    }

    private Map<Node, List<FetchPartition>> findLeaderForPartitions(List<FetchPartition> list) throws Exception {
        if (list.size() == 0) {
            throw new IllegalArgumentException("Leader request for empty partitions list");
        }
        LOG.info("Refreshing leader information for partitions {}", list);
        PartitionInfoFetcher partitionInfoFetcher = new PartitionInfoFetcher(getTopics(list), this.config);
        partitionInfoFetcher.start();
        new KillerWatchDog(partitionInfoFetcher, 60000L).start();
        List<KafkaTopicPartitionLeader> partitions = partitionInfoFetcher.getPartitions();
        ArrayList arrayList = new ArrayList(list);
        HashMap hashMap = new HashMap();
        for (KafkaTopicPartitionLeader kafkaTopicPartitionLeader : partitions) {
            if (arrayList.size() == 0) {
                break;
            }
            Iterator it = arrayList.iterator();
            while (true) {
                if (it.hasNext()) {
                    FetchPartition fetchPartition = (FetchPartition) it.next();
                    if (fetchPartition.topic.equals(kafkaTopicPartitionLeader.getTopicPartition().getTopic()) && fetchPartition.partition == kafkaTopicPartitionLeader.getTopicPartition().getPartition()) {
                        Node leader = kafkaTopicPartitionLeader.getLeader();
                        List list2 = (List) hashMap.get(leader);
                        if (list2 == null) {
                            list2 = new ArrayList();
                            hashMap.put(leader, list2);
                        }
                        list2.add(fetchPartition);
                        it.remove();
                    }
                }
            }
        }
        if (arrayList.size() > 0) {
            throw new RuntimeException("Unable to find a leader for partitions: " + arrayList);
        }
        LOG.debug("Partitions with assigned leaders {}", hashMap);
        return hashMap;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.Fetcher
    public void stopWithError(Throwable th) {
        if (!this.error.compareAndSet(null, th) || this.mainThread == null) {
            return;
        }
        this.mainThread.interrupt();
    }

    public static List<String> getTopics(List<FetchPartition> list) {
        HashSet hashSet = new HashSet();
        Iterator<FetchPartition> it = list.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().topic);
        }
        return new ArrayList(hashSet);
    }
}
