/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkKafkaConsumer09<T>
extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = 2324564345203409112L;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    public static final long DEFAULT_POLL_TIMEOUT = 100L;
    private final Properties properties;
    private final List<KafkaTopicPartition> partitionInfos;
    private final String consumerId;
    private transient List<TopicPartition> subscribedPartitions;
    private transient List<KafkaTopicPartition> subscribedPartitionsAsFlink;
    private transient KafkaConsumer<byte[], byte[]> consumer;
    private transient ConsumerThread<T> consumerThread;
    private transient Throwable consumerThreadException;
    private transient Thread waitThread;

    public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }

    public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }

    public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, (KeyedDeserializationSchema<T>)new KeyedDeserializationSchemaWrapper(deserializer), props);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
        super(deserializer, props);
        Objects.requireNonNull(topics, "topics");
        this.properties = Objects.requireNonNull(props, "props");
        FlinkKafkaConsumer09.setDeserializer(this.properties);
        try (KafkaConsumer consumer = null;){
            consumer = new KafkaConsumer(this.properties);
            this.partitionInfos = new ArrayList<KafkaTopicPartition>();
            for (String topic : topics) {
                List partitionsForTopic = null;
                for (int tri = 0; tri < 10; ++tri) {
                    LOG.info("Trying to get partitions for topic {}", (Object)topic);
                    try {
                        partitionsForTopic = consumer.partitionsFor(topic);
                        if (partitionsForTopic != null && partitionsForTopic.size() > 0) {
                            break;
                        }
                    }
                    catch (NullPointerException nullPointerException) {
                        // empty catch block
                    }
                    consumer.close();
                    try {
                        Thread.sleep(1000L);
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                    consumer = new KafkaConsumer(this.properties);
                }
                if (partitionsForTopic == null) continue;
                this.partitionInfos.addAll(FlinkKafkaConsumer09.convertToFlinkKafkaTopicPartition(partitionsForTopic));
            }
        }
        if (this.partitionInfos.isEmpty()) {
            throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
        }
        LOG.info("Got {} partitions from these topics: {}", (Object)this.partitionInfos.size(), topics);
        if (LOG.isInfoEnabled()) {
            FlinkKafkaConsumer09.logPartitionInfo(this.partitionInfos);
        }
        this.consumerId = UUID.randomUUID().toString();
    }

    public static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions) {
        Objects.requireNonNull(partitions, "The given list of partitions was null");
        ArrayList<KafkaTopicPartition> ret = new ArrayList<KafkaTopicPartition>(partitions.size());
        for (PartitionInfo pi : partitions) {
            ret.add(new KafkaTopicPartition(pi.topic(), pi.partition()));
        }
        return ret;
    }

    public static List<TopicPartition> convertToKafkaTopicPartition(List<KafkaTopicPartition> partitions) {
        ArrayList<TopicPartition> ret = new ArrayList<TopicPartition>(partitions.size());
        for (KafkaTopicPartition ktp : partitions) {
            ret.add(new TopicPartition(ktp.getTopic(), ktp.getPartition()));
        }
        return ret;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        int numConsumers = this.getRuntimeContext().getNumberOfParallelSubtasks();
        int thisConsumerIndex = this.getRuntimeContext().getIndexOfThisSubtask();
        this.subscribedPartitionsAsFlink = FlinkKafkaConsumer09.assignPartitions(this.partitionInfos, (int)numConsumers, (int)thisConsumerIndex);
        if (this.subscribedPartitionsAsFlink.isEmpty()) {
            LOG.info("This consumer doesn't have any partitions assigned");
            this.offsetsState = null;
            return;
        }
        StreamingRuntimeContext streamingRuntimeContext = (StreamingRuntimeContext)this.getRuntimeContext();
        this.properties.setProperty("enable.auto.commit", Boolean.toString(!streamingRuntimeContext.isCheckpointingEnabled()));
        this.consumer = new KafkaConsumer(this.properties);
        this.subscribedPartitions = FlinkKafkaConsumer09.convertToKafkaTopicPartition(this.subscribedPartitionsAsFlink);
        this.consumer.assign(this.subscribedPartitions);
        if (!Boolean.valueOf(this.properties.getProperty(KEY_DISABLE_METRICS, "false")).booleanValue()) {
            Map metrics = this.consumer.metrics();
            if (metrics == null) {
                LOG.info("Consumer implementation does not support metrics");
            } else {
                for (Map.Entry metric : metrics.entrySet()) {
                    String name = this.consumerId + "-consumer-" + ((MetricName)metric.getKey()).name();
                    DefaultKafkaMetricAccumulator kafkaAccumulator = DefaultKafkaMetricAccumulator.createFor((Metric)((Metric)metric.getValue()));
                    if (kafkaAccumulator == null) continue;
                    this.getRuntimeContext().addAccumulator(name, (Accumulator)kafkaAccumulator);
                }
            }
        }
        if (this.restoreToOffset != null) {
            for (Map.Entry offset : this.restoreToOffset.entrySet()) {
                this.consumer.seek(new TopicPartition(((KafkaTopicPartition)offset.getKey()).getTopic(), ((KafkaTopicPartition)offset.getKey()).getPartition()), (Long)offset.getValue() + 1L);
            }
            this.offsetsState = this.restoreToOffset;
        } else {
            this.offsetsState = new HashMap();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.consumer != null) {
            this.consumerThread = new ConsumerThread<T>(this, sourceContext);
            this.consumerThread.setDaemon(true);
            this.consumerThread.start();
            while (this.consumerThread.isAlive()) {
                if (this.consumerThreadException != null) {
                    throw new RuntimeException("ConsumerThread threw an exception: " + this.consumerThreadException.getMessage(), this.consumerThreadException);
                }
                try {
                    this.consumerThread.join(50L);
                }
                catch (InterruptedException ie) {
                    this.consumerThread.shutdown();
                }
            }
            if (this.consumerThreadException != null) {
                throw new RuntimeException("ConsumerThread threw an exception: " + this.consumerThreadException.getMessage(), this.consumerThreadException);
            }
        } else {
            sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
            Object waitLock = new Object();
            this.waitThread = Thread.currentThread();
            while (this.running) {
                try {
                    Object object = waitLock;
                    synchronized (object) {
                        waitLock.wait();
                    }
                }
                catch (InterruptedException interruptedException) {
                }
            }
        }
        sourceContext.close();
    }

    public void cancel() {
        this.running = false;
        if (this.consumerThread != null) {
            this.consumerThread.shutdown();
        } else if (this.waitThread != null) {
            this.waitThread.interrupt();
        }
    }

    public void close() throws Exception {
        this.cancel();
        super.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void commitOffsets(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
        Map<TopicPartition, OffsetAndMetadata> kafkaCheckpointOffsets = FlinkKafkaConsumer09.convertToCommitMap(checkpointOffsets);
        KafkaConsumer<byte[], byte[]> kafkaConsumer = this.consumer;
        synchronized (kafkaConsumer) {
            this.consumer.commitSync(kafkaCheckpointOffsets);
        }
    }

    public static Map<TopicPartition, OffsetAndMetadata> convertToCommitMap(HashMap<KafkaTopicPartition, Long> checkpointOffsets) {
        HashMap<TopicPartition, OffsetAndMetadata> ret = new HashMap<TopicPartition, OffsetAndMetadata>(checkpointOffsets.size());
        for (Map.Entry<KafkaTopicPartition, Long> partitionOffset : checkpointOffsets.entrySet()) {
            ret.put(new TopicPartition(partitionOffset.getKey().getTopic(), partitionOffset.getKey().getPartition()), new OffsetAndMetadata(partitionOffset.getValue().longValue(), ""));
        }
        return ret;
    }

    protected static void setDeserializer(Properties props) {
        if (!props.contains("key.deserializer")) {
            props.put("key.deserializer", ByteArrayDeserializer.class.getCanonicalName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"key.deserializer");
        }
        if (!props.contains("value.deserializer")) {
            props.put("value.deserializer", ByteArrayDeserializer.class.getCanonicalName());
        } else {
            LOG.warn("Overwriting the '{}' is not recommended", (Object)"value.deserializer");
        }
    }

    private void stopWithError(Throwable t) {
        this.consumerThreadException = t;
    }

    private static class ConsumerThread<T>
    extends Thread {
        private final FlinkKafkaConsumer09<T> flinkKafkaConsumer;
        private final SourceFunction.SourceContext<T> sourceContext;
        private boolean running = true;

        public ConsumerThread(FlinkKafkaConsumer09<T> flinkKafkaConsumer, SourceFunction.SourceContext<T> sourceContext) {
            this.flinkKafkaConsumer = flinkKafkaConsumer;
            this.sourceContext = sourceContext;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block26: {
                try {
                    long pollTimeout = Long.parseLong(((FlinkKafkaConsumer09)this.flinkKafkaConsumer).properties.getProperty(FlinkKafkaConsumer09.KEY_POLL_TIMEOUT, Long.toString(100L)));
                    while (this.running) {
                        ConsumerRecords records;
                        KafkaConsumer kafkaConsumer = ((FlinkKafkaConsumer09)this.flinkKafkaConsumer).consumer;
                        synchronized (kafkaConsumer) {
                            try {
                                records = ((FlinkKafkaConsumer09)this.flinkKafkaConsumer).consumer.poll(pollTimeout);
                            }
                            catch (WakeupException we) {
                                if (this.running) {
                                    throw we;
                                }
                                continue;
                            }
                        }
                        for (int i = 0; i < ((FlinkKafkaConsumer09)this.flinkKafkaConsumer).subscribedPartitions.size(); ++i) {
                            TopicPartition partition = (TopicPartition)((FlinkKafkaConsumer09)this.flinkKafkaConsumer).subscribedPartitions.get(i);
                            KafkaTopicPartition flinkPartition = (KafkaTopicPartition)((FlinkKafkaConsumer09)this.flinkKafkaConsumer).subscribedPartitionsAsFlink.get(i);
                            List partitionRecords = records.records(partition);
                            for (int j = 0; j < partitionRecords.size(); ++j) {
                                ConsumerRecord record = (ConsumerRecord)partitionRecords.get(j);
                                Object value = this.flinkKafkaConsumer.deserializer.deserialize((byte[])record.key(), (byte[])record.value(), record.topic(), record.partition(), record.offset());
                                if (this.flinkKafkaConsumer.deserializer.isEndOfStream(value)) {
                                    this.running = false;
                                    break block26;
                                }
                                Object object = this.sourceContext.getCheckpointLock();
                                synchronized (object) {
                                    this.sourceContext.collect(value);
                                    this.flinkKafkaConsumer.offsetsState.put(flinkPartition, record.offset());
                                    continue;
                                }
                            }
                        }
                    }
                }
                catch (Throwable t) {
                    if (this.running) {
                        ((FlinkKafkaConsumer09)this.flinkKafkaConsumer).stopWithError(t);
                    } else {
                        LOG.debug("Stopped ConsumerThread threw exception", t);
                    }
                }
                finally {
                    try {
                        ((FlinkKafkaConsumer09)this.flinkKafkaConsumer).consumer.close();
                    }
                    catch (Throwable t) {
                        LOG.warn("Error while closing consumer", t);
                    }
                }
            }
        }

        public void shutdown() {
            this.running = false;
            ((FlinkKafkaConsumer09)this.flinkKafkaConsumer).consumer.wakeup();
        }
    }
}

