package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.class */
public class LegacyFetcher implements Fetcher {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer.class);
    private final String topic;
    private final Properties config;
    private final String taskName;
    private Map<TopicPartition, Long> partitionsToRead;
    private volatile Thread mainThread;
    private volatile boolean running = true;
    private final AtomicReference<Throwable> error = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher$FetchPartition.class */
    public static class FetchPartition {
        int partition;
        long nextOffsetToRead;

        FetchPartition(int i, long j) {
            this.partition = i;
            this.nextOffsetToRead = j;
        }

        public String toString() {
            return "FetchPartition {partition=" + this.partition + ", offset=" + this.nextOffsetToRead + '}';
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher$KillerWatchDog.class */
    private static class KillerWatchDog extends Thread {
        private final Thread toKill;
        private final long timeout;

        private KillerWatchDog(Thread thread, long j) {
            super("KillerWatchDog");
            setDaemon(true);
            this.toKill = thread;
            this.timeout = j;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis() + this.timeout;
            while (this.toKill.isAlive()) {
                long currentTimeMillis2 = System.currentTimeMillis();
                if (currentTimeMillis2 >= currentTimeMillis) {
                    break;
                } else {
                    try {
                        this.toKill.join(currentTimeMillis - currentTimeMillis2);
                    } catch (InterruptedException e) {
                    }
                }
            }
            if (this.toKill.isAlive()) {
                this.toKill.stop();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher$PartitionInfoFetcher.class */
    private static class PartitionInfoFetcher extends Thread {
        private final String topic;
        private final Properties properties;
        private volatile List<PartitionInfo> result;
        private volatile Throwable error;

        PartitionInfoFetcher(String str, Properties properties) {
            this.topic = str;
            this.properties = properties;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                this.result = FlinkKafkaConsumer.getPartitionsForTopic(this.topic, this.properties);
            } catch (Throwable th) {
                this.error = th;
            }
        }

        public List<PartitionInfo> getPartitions() throws Exception {
            try {
                join();
                if (this.error != null) {
                    throw new Exception("Failed to fetch partitions for topic " + this.topic, this.error);
                }
                if (this.result != null) {
                    return this.result;
                }
                throw new Exception("Partition fetching failed");
            } catch (InterruptedException e) {
                throw new Exception("Partition fetching was cancelled before completion");
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher$SimpleConsumerThread.class */
    private static class SimpleConsumerThread<T> extends Thread {
        private final SourceFunction.SourceContext<T> sourceContext;
        private final DeserializationSchema<T> valueDeserializer;
        private final long[] offsetsState;
        private final FetchPartition[] partitions;
        private final Node broker;
        private final String topic;
        private final Properties config;
        private final LegacyFetcher owner;
        private SimpleConsumer consumer;
        private volatile boolean running = true;

        public SimpleConsumerThread(LegacyFetcher legacyFetcher, Properties properties, String str, Node node, FetchPartition[] fetchPartitionArr, SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> deserializationSchema, long[] jArr) {
            this.owner = legacyFetcher;
            this.config = properties;
            this.topic = str;
            this.broker = node;
            this.partitions = fetchPartitionArr;
            this.sourceContext = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext);
            this.valueDeserializer = (DeserializationSchema) Preconditions.checkNotNull(deserializationSchema);
            this.offsetsState = (long[]) Preconditions.checkNotNull(jArr);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    String str = "flink-kafka-consumer-legacy-" + this.broker.id();
                    int intValue = Integer.valueOf(this.config.getProperty("socket.timeout.ms", "30000")).intValue();
                    int intValue2 = Integer.valueOf(this.config.getProperty("socket.receive.buffer.bytes", "65536")).intValue();
                    int intValue3 = Integer.valueOf(this.config.getProperty("fetch.message.max.bytes", "1048576")).intValue();
                    int intValue4 = Integer.valueOf(this.config.getProperty("fetch.wait.max.ms", "100")).intValue();
                    int intValue5 = Integer.valueOf(this.config.getProperty("fetch.min.bytes", "1")).intValue();
                    this.consumer = new SimpleConsumer(this.broker.host(), this.broker.port(), intValue, intValue2, str);
                    ArrayList arrayList = new ArrayList();
                    for (FetchPartition fetchPartition : this.partitions) {
                        if (fetchPartition.nextOffsetToRead == FlinkKafkaConsumer.OFFSET_NOT_SET) {
                            arrayList.add(fetchPartition);
                        }
                    }
                    if (arrayList.size() > 0) {
                        getLastOffset(this.consumer, this.topic, arrayList, getInvalidOffsetBehavior(this.config));
                        LegacyFetcher.LOG.info("No prior offsets found for some partitions in topic {}. Fetched the following start offsets {}", this.topic, arrayList);
                    }
                    int i = 0;
                    while (this.running) {
                        FetchRequestBuilder fetchRequestBuilder = new FetchRequestBuilder();
                        fetchRequestBuilder.clientId(str);
                        fetchRequestBuilder.maxWait(intValue4);
                        fetchRequestBuilder.minBytes(intValue5);
                        for (FetchPartition fetchPartition2 : this.partitions) {
                            fetchRequestBuilder.addFetch(this.topic, fetchPartition2.partition, fetchPartition2.nextOffsetToRead, intValue3);
                        }
                        FetchRequest build = fetchRequestBuilder.build();
                        LegacyFetcher.LOG.debug("Issuing fetch request {}", build);
                        FetchResponse fetch = this.consumer.fetch(build);
                        if (fetch.hasError()) {
                            String str2 = "";
                            ArrayList arrayList2 = new ArrayList();
                            for (FetchPartition fetchPartition3 : this.partitions) {
                                short errorCode = fetch.errorCode(this.topic, fetchPartition3.partition);
                                if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                                    arrayList2.add(fetchPartition3);
                                } else if (errorCode != ErrorMapping.NoError()) {
                                    str2 = str2 + "\nException for partition " + fetchPartition3.partition + ": " + StringUtils.stringifyException(ErrorMapping.exceptionFor(errorCode));
                                }
                            }
                            if (arrayList2.size() <= 0) {
                                throw new IOException("Error while fetching from broker: " + str2);
                            }
                            int i2 = i;
                            i++;
                            if (i2 > 0) {
                                throw new RuntimeException("Found invalid offsets more than once in partitions " + arrayList2.toString() + " Exceptions: " + str2);
                            }
                            LegacyFetcher.LOG.warn("The following partitions had an invalid offset: {}", arrayList2);
                            getLastOffset(this.consumer, this.topic, arrayList2, getInvalidOffsetBehavior(this.config));
                            LegacyFetcher.LOG.warn("The new partition offsets are {}", arrayList2);
                        } else {
                            int i3 = 0;
                            for (FetchPartition fetchPartition4 : this.partitions) {
                                ByteBufferMessageSet messageSet = fetch.messageSet(this.topic, fetchPartition4.partition);
                                int i4 = fetchPartition4.partition;
                                Iterator it = messageSet.iterator();
                                while (it.hasNext()) {
                                    MessageAndOffset messageAndOffset = (MessageAndOffset) it.next();
                                    if (!this.running) {
                                        if (this.consumer != null) {
                                            try {
                                                this.consumer.close();
                                                return;
                                            } catch (Throwable th) {
                                                LegacyFetcher.LOG.error("Error while closing the Kafka simple consumer", th);
                                                return;
                                            }
                                        }
                                        return;
                                    }
                                    i3++;
                                    if (messageAndOffset.offset() < fetchPartition4.nextOffsetToRead) {
                                        LegacyFetcher.LOG.info("Skipping message with offset " + messageAndOffset.offset() + " because we have seen messages until " + fetchPartition4.nextOffsetToRead + " from partition " + fetchPartition4.partition + " already");
                                    } else {
                                        ByteBuffer payload = messageAndOffset.message().payload();
                                        byte[] bArr = new byte[payload.remaining()];
                                        payload.get(bArr);
                                        Object deserialize = this.valueDeserializer.deserialize(bArr);
                                        long offset = messageAndOffset.offset();
                                        synchronized (this.sourceContext.getCheckpointLock()) {
                                            this.sourceContext.collect(deserialize);
                                            this.offsetsState[i4] = offset;
                                        }
                                        fetchPartition4.nextOffsetToRead = offset + 1;
                                    }
                                }
                            }
                            LegacyFetcher.LOG.debug("This fetch contained {} messages", Integer.valueOf(i3));
                        }
                    }
                    if (this.consumer != null) {
                        try {
                            this.consumer.close();
                        } catch (Throwable th2) {
                            LegacyFetcher.LOG.error("Error while closing the Kafka simple consumer", th2);
                        }
                    }
                } catch (Throwable th3) {
                    this.owner.stopWithError(th3);
                    if (this.consumer != null) {
                        try {
                            this.consumer.close();
                        } catch (Throwable th4) {
                            LegacyFetcher.LOG.error("Error while closing the Kafka simple consumer", th4);
                        }
                    }
                }
            } catch (Throwable th5) {
                if (this.consumer != null) {
                    try {
                        this.consumer.close();
                    } catch (Throwable th6) {
                        LegacyFetcher.LOG.error("Error while closing the Kafka simple consumer", th6);
                    }
                }
                throw th5;
            }
        }

        public void cancel() {
            this.running = false;
            if (this.consumer != null) {
                this.consumer.close();
            }
            interrupt();
        }

        private static void getLastOffset(SimpleConsumer simpleConsumer, String str, List<FetchPartition> list, long j) {
            HashMap hashMap = new HashMap();
            Iterator<FetchPartition> it = list.iterator();
            while (it.hasNext()) {
                hashMap.put(new TopicAndPartition(str, it.next().partition), new PartitionOffsetRequestInfo(j, 1));
            }
            OffsetResponse offsetsBefore = simpleConsumer.getOffsetsBefore(new OffsetRequest(hashMap, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()));
            if (!offsetsBefore.hasError()) {
                for (FetchPartition fetchPartition : list) {
                    fetchPartition.nextOffsetToRead = offsetsBefore.offsets(str, fetchPartition.partition)[0];
                }
                return;
            }
            String str2 = "";
            for (FetchPartition fetchPartition2 : list) {
                short errorCode = offsetsBefore.errorCode(str, fetchPartition2.partition);
                if (errorCode != ErrorMapping.NoError()) {
                    str2 = str2 + "\nException for partition " + fetchPartition2.partition + ": " + StringUtils.stringifyException(ErrorMapping.exceptionFor(errorCode));
                }
            }
            throw new RuntimeException("Unable to get last offset for topic " + str + " and partitions " + list + ". " + str2);
        }

        private static long getInvalidOffsetBehavior(Properties properties) {
            return properties.getProperty("auto.offset.reset", "latest").equals("latest") ? kafka.api.OffsetRequest.LatestTime() : kafka.api.OffsetRequest.EarliestTime();
        }
    }

    public LegacyFetcher(String str, Properties properties, String str2) {
        this.config = (Properties) Preconditions.checkNotNull(properties, "The config properties cannot be null");
        this.topic = (String) Preconditions.checkNotNull(str, "The topic cannot be null");
        this.taskName = str2;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.Fetcher
    public void setPartitionsToRead(List<TopicPartition> list) {
        this.partitionsToRead = new HashMap(list.size());
        Iterator<TopicPartition> it = list.iterator();
        while (it.hasNext()) {
            this.partitionsToRead.put(it.next(), Long.valueOf(FlinkKafkaConsumer.OFFSET_NOT_SET));
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.Fetcher
    public void seek(TopicPartition topicPartition, long j) {
        if (this.partitionsToRead == null) {
            throw new IllegalArgumentException("No partitions to read set");
        }
        if (!this.partitionsToRead.containsKey(topicPartition)) {
            throw new IllegalArgumentException("Can not set offset on a partition (" + topicPartition + ") we are not going to read. Partitions to read " + this.partitionsToRead);
        }
        this.partitionsToRead.put(topicPartition, Long.valueOf(j));
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.Fetcher
    public void close() {
        this.running = false;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.Fetcher
    public <T> void run(SourceFunction.SourceContext<T> sourceContext, DeserializationSchema<T> deserializationSchema, long[] jArr) throws Exception {
        if (this.partitionsToRead == null || this.partitionsToRead.size() == 0) {
            throw new IllegalArgumentException("No partitions set");
        }
        this.mainThread = Thread.currentThread();
        LOG.info("Reading from partitions " + this.partitionsToRead + " using the legacy fetcher");
        PartitionInfoFetcher partitionInfoFetcher = new PartitionInfoFetcher(this.topic, this.config);
        partitionInfoFetcher.start();
        new KillerWatchDog(partitionInfoFetcher, 60000L).start();
        List<PartitionInfo> partitions = partitionInfoFetcher.getPartitions();
        int i = 0;
        HashMap hashMap = new HashMap();
        for (PartitionInfo partitionInfo : partitions) {
            if (partitionInfo.leader() == null) {
                throw new RuntimeException("Unable to consume partition " + partitionInfo.partition() + " from topic " + partitionInfo.topic() + " because it does not have a leader");
            }
            for (Map.Entry<TopicPartition, Long> entry : this.partitionsToRead.entrySet()) {
                TopicPartition key = entry.getKey();
                long longValue = entry.getValue().longValue();
                if (key.partition() == partitionInfo.partition()) {
                    List list = (List) hashMap.get(partitionInfo.leader());
                    if (list == null) {
                        list = new ArrayList();
                        hashMap.put(partitionInfo.leader(), list);
                    }
                    list.add(new FetchPartition(key.partition(), longValue));
                    i++;
                }
            }
        }
        if (this.partitionsToRead.size() != i) {
            throw new RuntimeException(this.partitionsToRead.size() + " partitions to read, but got only " + i + " partition infos with lead brokers.");
        }
        ArrayList arrayList = new ArrayList(hashMap.size());
        for (Map.Entry entry2 : hashMap.entrySet()) {
            Node node = (Node) entry2.getKey();
            List list2 = (List) entry2.getValue();
            SimpleConsumerThread simpleConsumerThread = new SimpleConsumerThread(this, this.config, this.topic, node, (FetchPartition[]) list2.toArray(new FetchPartition[list2.size()]), sourceContext, deserializationSchema, jArr);
            simpleConsumerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)", this.taskName, Integer.valueOf(node.id()), node.host(), Integer.valueOf(node.port())));
            simpleConsumerThread.setDaemon(true);
            arrayList.add(simpleConsumerThread);
        }
        if (this.running) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                SimpleConsumerThread simpleConsumerThread2 = (SimpleConsumerThread) it.next();
                LOG.info("Starting thread {}", simpleConsumerThread2.getName());
                simpleConsumerThread2.start();
            }
            boolean z = true;
            while (this.running && this.error.get() == null && z) {
                try {
                    try {
                        Iterator it2 = arrayList.iterator();
                        while (it2.hasNext()) {
                            ((SimpleConsumerThread) it2.next()).join();
                        }
                        z = false;
                        Iterator it3 = arrayList.iterator();
                        while (it3.hasNext()) {
                            z |= ((SimpleConsumerThread) it3.next()).isAlive();
                        }
                    } catch (InterruptedException e) {
                    }
                } finally {
                    Iterator it4 = arrayList.iterator();
                    while (it4.hasNext()) {
                        SimpleConsumerThread simpleConsumerThread3 = (SimpleConsumerThread) it4.next();
                        if (simpleConsumerThread3.isAlive()) {
                            simpleConsumerThread3.cancel();
                        }
                    }
                }
            }
            Throwable th = this.error.get();
            if (th != null) {
                throw new Exception(th.getMessage(), th);
            }
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.Fetcher
    public void stopWithError(Throwable th) {
        if (!this.error.compareAndSet(null, th) || this.mainThread == null) {
            return;
        }
        this.mainThread.interrupt();
    }
}
