/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.flink.annotation.Internal;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
import org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.PropertiesUtil;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Node;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
class SimpleConsumerThread<T>
extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(SimpleConsumerThread.class);
    private static final KafkaTopicPartitionState<TopicAndPartition> MARKER = Kafka08Fetcher.MARKER;
    private final Kafka08Fetcher<T> owner;
    private final KafkaDeserializationSchema<T> deserializer;
    private final List<KafkaTopicPartitionState<TopicAndPartition>> partitions;
    private final Node broker;
    private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> newPartitionsQueue;
    private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions;
    private final ExceptionProxy errorHandler;
    private final long invalidOffsetBehavior;
    private volatile boolean running = true;
    private volatile SimpleConsumer consumer;
    private final int soTimeout;
    private final int minBytes;
    private final int maxWait;
    private final int fetchSize;
    private final int bufferSize;
    private final int reconnectLimit;
    private final String clientId;

    public SimpleConsumerThread(Kafka08Fetcher<T> owner, ExceptionProxy errorHandler, Properties config, Node broker, List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions, ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> unassignedPartitions, KafkaDeserializationSchema<T> deserializer, long invalidOffsetBehavior) {
        this.owner = owner;
        this.errorHandler = errorHandler;
        this.broker = broker;
        SimpleConsumerThread.checkAllPartitionsHaveDefinedStartingOffsets(seedPartitions);
        this.partitions = seedPartitions;
        this.deserializer = Objects.requireNonNull(deserializer);
        this.unassignedPartitions = Objects.requireNonNull(unassignedPartitions);
        this.newPartitionsQueue = new ClosableBlockingQueue();
        this.invalidOffsetBehavior = invalidOffsetBehavior;
        this.soTimeout = PropertiesUtil.getInt((Properties)config, (String)"socket.timeout.ms", (int)30000);
        this.minBytes = PropertiesUtil.getInt((Properties)config, (String)"fetch.min.bytes", (int)1);
        this.maxWait = PropertiesUtil.getInt((Properties)config, (String)"fetch.wait.max.ms", (int)100);
        this.fetchSize = PropertiesUtil.getInt((Properties)config, (String)"fetch.message.max.bytes", (int)0x100000);
        this.bufferSize = PropertiesUtil.getInt((Properties)config, (String)"socket.receive.buffer.bytes", (int)65536);
        this.reconnectLimit = PropertiesUtil.getInt((Properties)config, (String)"flink.simple-consumer-reconnectLimit", (int)3);
        String groupId = config.getProperty("group.id", "flink-kafka-consumer-legacy-" + broker.id());
        this.clientId = config.getProperty("client.id", groupId);
    }

    public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() {
        return this.newPartitionsQueue;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        LOG.info("Starting to fetch from {}", this.partitions);
        try {
            this.consumer = new SimpleConsumer(this.broker.host(), this.broker.port(), this.soTimeout, this.bufferSize, this.clientId);
            SimpleConsumerThread.requestAndSetEarliestOrLatestOffsetsFromKafka(this.consumer, this.partitions);
            LOG.info("Starting to consume {} partitions with consumer thread {}", (Object)this.partitions.size(), (Object)this.getName());
            int offsetOutOfRangeCount = 0;
            int reconnects = 0;
            while (this.running) {
                Iterator<KafkaTopicPartitionState<TopicAndPartition>> partitionsIterator;
                FetchResponse fetchResponse;
                Object newPartition2;
                List newPartitions = this.newPartitionsQueue.pollBatch();
                if (newPartitions != null) {
                    SimpleConsumerThread.checkAllPartitionsHaveDefinedStartingOffsets(newPartitions);
                    SimpleConsumerThread.requestAndSetEarliestOrLatestOffsetsFromKafka(this.consumer, newPartitions);
                    for (Object newPartition2 : newPartitions) {
                        if (this.partitions.contains(newPartition2)) {
                            throw new IllegalStateException("Adding partition " + newPartition2 + " to subscribed partitions even though it is already subscribed");
                        }
                        this.partitions.add((KafkaTopicPartitionState<TopicAndPartition>)newPartition2);
                    }
                    LOG.info("Adding {} new partitions to consumer thread {}", (Object)newPartitions.size(), (Object)this.getName());
                    LOG.debug("Partitions list: {}", (Object)newPartitions);
                }
                if (this.partitions.size() == 0) {
                    if (!this.newPartitionsQueue.close()) continue;
                    this.running = false;
                    LOG.info("Consumer thread {} does not have any partitions assigned anymore. Stopping thread.", (Object)this.getName());
                    this.unassignedPartitions.add(MARKER);
                    break;
                }
                FetchRequestBuilder frb = new FetchRequestBuilder();
                frb.clientId(this.clientId);
                frb.maxWait(this.maxWait);
                frb.minBytes(this.minBytes);
                newPartition2 = this.partitions.iterator();
                while (newPartition2.hasNext()) {
                    KafkaTopicPartitionState partition = (KafkaTopicPartitionState)newPartition2.next();
                    frb.addFetch(partition.getKafkaTopicPartition().getTopic(), partition.getKafkaTopicPartition().getPartition(), partition.getOffset() + 1L, this.fetchSize);
                }
                FetchRequest fetchRequest = frb.build();
                LOG.debug("Issuing fetch request {}", (Object)fetchRequest);
                try {
                    fetchResponse = this.consumer.fetch(fetchRequest);
                }
                catch (Throwable cce) {
                    if (cce instanceof ClosedChannelException) {
                        LOG.warn("Fetch failed because of ClosedChannelException.");
                        LOG.debug("Full exception", cce);
                        if (++reconnects >= this.reconnectLimit) {
                            LOG.warn("Unable to reach broker after {} retries. Returning all current partitions", (Object)this.reconnectLimit);
                            for (KafkaTopicPartitionState<TopicAndPartition> fp : this.partitions) {
                                this.unassignedPartitions.add(fp);
                            }
                            this.partitions.clear();
                            continue;
                        }
                        try {
                            this.consumer.close();
                        }
                        catch (Throwable t) {
                            LOG.warn("Error while closing consumer connection", t);
                        }
                        Thread.sleep(100L);
                        this.consumer = new SimpleConsumer(this.broker.host(), this.broker.port(), this.soTimeout, this.bufferSize, this.clientId);
                        continue;
                    }
                    throw cce;
                }
                reconnects = 0;
                if (fetchResponse == null) {
                    throw new IOException("Fetch from Kafka failed (request returned null)");
                }
                if (fetchResponse.hasError()) {
                    String exception = "";
                    ArrayList<KafkaTopicPartitionState<TopicAndPartition>> partitionsToGetOffsetsFor = new ArrayList<KafkaTopicPartitionState<TopicAndPartition>>();
                    partitionsIterator = this.partitions.iterator();
                    boolean partitionsRemoved = false;
                    while (partitionsIterator.hasNext()) {
                        KafkaTopicPartitionState<TopicAndPartition> fp = partitionsIterator.next();
                        short code = fetchResponse.errorCode(fp.getTopic(), fp.getPartition());
                        if (code == ErrorMapping.OffsetOutOfRangeCode()) {
                            partitionsToGetOffsetsFor.add(fp);
                            continue;
                        }
                        if (code == ErrorMapping.NotLeaderForPartitionCode() || code == ErrorMapping.LeaderNotAvailableCode() || code == ErrorMapping.BrokerNotAvailableCode() || code == ErrorMapping.UnknownCode()) {
                            LOG.warn("{} is not the leader of {}. Reassigning leader for partition", (Object)this.broker, fp);
                            LOG.debug("Error code = {}", (Object)code);
                            this.unassignedPartitions.add(fp);
                            partitionsIterator.remove();
                            partitionsRemoved = true;
                            continue;
                        }
                        if (code == ErrorMapping.NoError()) continue;
                        exception = exception + "\nException for " + fp.getTopic() + ":" + fp.getPartition() + ": " + ExceptionUtils.stringifyException((Throwable)ErrorMapping.exceptionFor((short)code));
                    }
                    if (partitionsToGetOffsetsFor.size() > 0) {
                        if (offsetOutOfRangeCount++ > 3) {
                            throw new RuntimeException("Found invalid offsets more than three times in partitions " + partitionsToGetOffsetsFor + " Exceptions: " + exception);
                        }
                        LOG.warn("The following partitions had an invalid offset: {}", partitionsToGetOffsetsFor);
                        SimpleConsumerThread.requestAndSetSpecificTimeOffsetsFromKafka(this.consumer, partitionsToGetOffsetsFor, this.invalidOffsetBehavior);
                        LOG.warn("The new partition offsets are {}", partitionsToGetOffsetsFor);
                        continue;
                    }
                    if (partitionsRemoved) continue;
                    throw new IOException("Error while fetching from broker '" + this.broker + "': " + exception);
                }
                offsetOutOfRangeCount = 0;
                int messagesInFetch = 0;
                int deletedMessages = 0;
                partitionsIterator = this.partitions.iterator();
                block23: while (partitionsIterator.hasNext()) {
                    KafkaTopicPartitionState<TopicAndPartition> currentPartition = partitionsIterator.next();
                    ByteBufferMessageSet messageSet = fetchResponse.messageSet(currentPartition.getTopic(), currentPartition.getPartition());
                    for (MessageAndOffset msg : messageSet) {
                        Object value;
                        long offset;
                        if (this.running) {
                            byte[] valueBytes;
                            ++messagesInFetch;
                            ByteBuffer payload = msg.message().payload();
                            offset = msg.offset();
                            if (offset <= currentPartition.getOffset()) {
                                LOG.info("Skipping message with offset " + msg.offset() + " because we have seen messages until (including) " + currentPartition.getOffset() + " from topic/partition " + currentPartition.getTopic() + '/' + currentPartition.getPartition() + " already");
                                continue;
                            }
                            if (payload == null) {
                                ++deletedMessages;
                                valueBytes = null;
                            } else {
                                valueBytes = new byte[payload.remaining()];
                                payload.get(valueBytes);
                            }
                            byte[] keyBytes = null;
                            int keySize = msg.message().keySize();
                            if (keySize >= 0) {
                                ByteBuffer keyPayload = msg.message().key();
                                keyBytes = new byte[keySize];
                                keyPayload.get(keyBytes);
                            }
                            if (this.deserializer.isEndOfStream(value = this.deserializer.deserialize(new ConsumerRecord(currentPartition.getTopic(), currentPartition.getPartition(), (Object)keyBytes, (Object)valueBytes, offset)))) {
                                partitionsIterator.remove();
                                continue block23;
                            }
                        } else {
                            return;
                        }
                        this.owner.emitRecord(value, currentPartition, offset);
                    }
                }
                LOG.debug("This fetch contained {} messages ({} deleted messages)", (Object)messagesInFetch, (Object)deletedMessages);
            }
            if (!this.newPartitionsQueue.close()) {
                throw new Exception("Bug: Cleanly leaving fetcher thread without having a closed queue.");
            }
        }
        catch (Throwable t) {
            this.errorHandler.reportError(t);
        }
        finally {
            if (this.consumer != null) {
                try {
                    this.consumer.close();
                }
                catch (Throwable t) {
                    LOG.error("Error while closing the Kafka simple consumer", t);
                }
            }
        }
    }

    public void cancel() {
        this.running = false;
        if (this.consumer != null) {
            this.consumer.close();
        }
        this.interrupt();
    }

    private static void requestAndSetSpecificTimeOffsetsFromKafka(SimpleConsumer consumer, List<KafkaTopicPartitionState<TopicAndPartition>> partitions, long whichTime) throws IOException {
        HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
            requestInfo.put((TopicAndPartition)part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(whichTime, 1));
        }
        SimpleConsumerThread.requestAndSetOffsetsFromKafka(consumer, partitions, requestInfo);
    }

    private static void requestAndSetEarliestOrLatestOffsetsFromKafka(SimpleConsumer consumer, List<KafkaTopicPartitionState<TopicAndPartition>> partitions) throws Exception {
        HashMap<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
            if (part.getOffset() != kafka.api.OffsetRequest.EarliestTime() && part.getOffset() != kafka.api.OffsetRequest.LatestTime()) continue;
            requestInfo.put((TopicAndPartition)part.getKafkaPartitionHandle(), new PartitionOffsetRequestInfo(part.getOffset(), 1));
        }
        SimpleConsumerThread.requestAndSetOffsetsFromKafka(consumer, partitions, requestInfo);
    }

    private static void requestAndSetOffsetsFromKafka(SimpleConsumer consumer, List<KafkaTopicPartitionState<TopicAndPartition>> partitionStates, Map<TopicAndPartition, PartitionOffsetRequestInfo> partitionToRequestInfo) throws IOException {
        OffsetRequest request;
        OffsetResponse response;
        int retries = 0;
        while ((response = consumer.getOffsetsBefore(request = new OffsetRequest(partitionToRequestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId()))).hasError()) {
            StringBuilder exception = new StringBuilder();
            for (KafkaTopicPartitionState<TopicAndPartition> part : partitionStates) {
                short code = response.errorCode(part.getTopic(), part.getPartition());
                if (code == ErrorMapping.NoError()) continue;
                exception.append("\nException for topic=").append(part.getTopic()).append(" partition=").append(part.getPartition()).append(": ").append(ExceptionUtils.stringifyException((Throwable)ErrorMapping.exceptionFor((short)code)));
            }
            if (++retries >= 3) {
                throw new IOException("Unable to get last offset for partitions " + partitionStates + ": " + exception.toString());
            }
            LOG.warn("Unable to get last offset for partitions: Exception(s): {}", (Object)exception);
        }
        for (KafkaTopicPartitionState<TopicAndPartition> part : partitionStates) {
            if (!partitionToRequestInfo.containsKey(part.getKafkaPartitionHandle())) continue;
            long offset = response.offsets(part.getTopic(), part.getPartition())[0];
            part.setOffset(offset - 1L);
        }
    }

    private static void checkAllPartitionsHaveDefinedStartingOffsets(List<KafkaTopicPartitionState<TopicAndPartition>> partitions) {
        for (KafkaTopicPartitionState<TopicAndPartition> part : partitions) {
            if (part.isOffsetDefined()) continue;
            throw new IllegalArgumentException("SimpleConsumerThread received a partition with undefined starting offset");
        }
    }
}

