package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import kafka.api.FetchRequestBuilder;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.class */
public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePartitionIterator.class);
    private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 10000;
    private List<String> hosts = new ArrayList();
    private String topic;
    private int port;
    private int partition;
    private long readOffset;
    private transient SimpleConsumer consumer;
    private List<String> replicaBrokers;
    private String clientName;
    private String leadBroker;
    private final int connectTimeoutMs;
    private final int bufferSize;
    private KafkaOffset initialOffset;
    private transient Iterator<MessageAndOffset> iter;
    private transient FetchResponse fetchResponse;

    public KafkaSinglePartitionIterator(String str, int i, String str2, int i2, KafkaOffset kafkaOffset, int i3, int i4) {
        this.hosts.add(str);
        this.port = i;
        this.connectTimeoutMs = i3;
        this.bufferSize = i4;
        this.topic = str2;
        this.partition = i2;
        this.initialOffset = kafkaOffset;
        this.replicaBrokers = new ArrayList();
    }

    @Override // org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator
    public void initialize() throws InterruptedException {
        PartitionMetadata findLeader;
        do {
            findLeader = findLeader(this.hosts, this.port, this.topic, this.partition);
            try {
                Thread.sleep(10000L);
            } catch (InterruptedException e) {
                throw new InterruptedException("Establishing connection to Kafka failed");
            }
        } while (findLeader == null);
        if (findLeader.leader() == null) {
            throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + this.hosts.get(0) + ":" + this.port);
        }
        this.leadBroker = findLeader.leader().host();
        this.clientName = "Client_" + this.topic + "_" + this.partition;
        this.consumer = new SimpleConsumer(this.leadBroker, this.port, this.connectTimeoutMs, this.bufferSize, this.clientName);
        this.readOffset = this.initialOffset.getOffset(this.consumer, this.topic, this.partition, this.clientName);
        resetFetchResponse(this.readOffset);
    }

    public void setPartition(int i) {
        this.partition = i;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator
    public boolean hasNext() {
        return true;
    }

    @Override // org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator
    public byte[] next() throws InterruptedException {
        return nextWithOffset().getMessage();
    }

    public boolean fetchHasNext() throws InterruptedException {
        synchronized (this.fetchResponse) {
            if (this.iter.hasNext()) {
                return true;
            }
            resetFetchResponse(this.readOffset);
            return this.iter.hasNext();
        }
    }

    @Override // org.apache.flink.streaming.connectors.kafka.api.simple.iterator.KafkaConsumerIterator
    public MessageWithMetadata nextWithOffset() throws InterruptedException {
        MessageWithMetadata messageWithMetadata;
        synchronized (this.fetchResponse) {
            if (!this.iter.hasNext()) {
                throw new RuntimeException("Trying to read when response is not fetched. Call fetchHasNext() first.");
            }
            MessageAndOffset next = this.iter.next();
            long offset = next.offset();
            while (offset < this.readOffset) {
                next = this.iter.next();
                offset = next.offset();
            }
            this.readOffset = next.nextOffset();
            ByteBuffer payload = next.message().payload();
            byte[] bArr = new byte[payload.limit()];
            payload.get(bArr);
            messageWithMetadata = new MessageWithMetadata(next.offset(), bArr, this.partition);
        }
        return messageWithMetadata;
    }

    private void resetFetchResponse(long j) throws InterruptedException {
        this.fetchResponse = this.consumer.fetch(new FetchRequestBuilder().clientId(this.clientName).addFetch(this.topic, this.partition, j, 100000).build());
        if (this.fetchResponse.hasError()) {
            short errorCode = this.fetchResponse.errorCode(this.topic, this.partition);
            if (LOG.isErrorEnabled()) {
                LOG.error("Error fetching data from the Broker:" + this.leadBroker + " Reason: " + ((int) errorCode));
            }
            if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
                if (LOG.isErrorEnabled()) {
                    LOG.error("Asked for invalid offset {}, setting the offset to the latest.", Long.valueOf(j));
                }
                this.readOffset = new CurrentOffset().getOffset(this.consumer, this.topic, this.partition, this.clientName);
            }
            this.consumer.close();
            this.consumer = null;
            this.leadBroker = findNewLeader(this.leadBroker, this.topic, this.partition, this.port);
        }
        this.iter = this.fetchResponse.messageSet(this.topic, this.partition).iterator();
    }

    /* JADX WARN: Finally extract failed */
    private PartitionMetadata findLeader(List<String> list, int i, String str, int i2) {
        PartitionMetadata partitionMetadata = null;
        Iterator<String> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            SimpleConsumer simpleConsumer = null;
            try {
                try {
                    simpleConsumer = new SimpleConsumer(next, i, this.connectTimeoutMs, this.bufferSize, "leaderLookup");
                    Iterator it2 = simpleConsumer.send(new TopicMetadataRequest(Collections.singletonList(str))).topicsMetadata().iterator();
                    while (it2.hasNext()) {
                        for (PartitionMetadata partitionMetadata2 : ((TopicMetadata) it2.next()).partitionsMetadata()) {
                            if (partitionMetadata2.partitionId() == i2) {
                                partitionMetadata = partitionMetadata2;
                                if (simpleConsumer != null) {
                                    simpleConsumer.close();
                                }
                            }
                        }
                    }
                    if (simpleConsumer != null) {
                        simpleConsumer.close();
                    }
                } catch (Exception e) {
                    throw new RuntimeException("Error communicating with Broker [" + next + "] to find Leader for [" + str + ", " + i2 + "]", e);
                }
            } catch (Throwable th) {
                if (simpleConsumer != null) {
                    simpleConsumer.close();
                }
                throw th;
            }
        }
        if (partitionMetadata != null) {
            this.replicaBrokers.clear();
            Iterator it3 = partitionMetadata.replicas().iterator();
            while (it3.hasNext()) {
                this.replicaBrokers.add(((Broker) it3.next()).host());
            }
        }
        return partitionMetadata;
    }

    private String findNewLeader(String str, String str2, int i, int i2) throws InterruptedException {
        boolean z;
        for (int i3 = 0; i3 < 3; i3++) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Trying to find a new leader after Broker failure.");
            }
            PartitionMetadata findLeader = findLeader(this.replicaBrokers, i2, str2, i);
            if (findLeader == null) {
                z = true;
            } else if (findLeader.leader() == null) {
                z = true;
            } else {
                if (!str.equalsIgnoreCase(findLeader.leader().host()) || i3 != 0) {
                    return findLeader.leader().host();
                }
                z = true;
            }
            if (z) {
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                }
            }
        }
        throw new InterruptedException("Unable to find new leader after Broker failure.");
    }
}
