package org.apache.flink.streaming.connectors.kafka;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.DefaultKafkaMetricAccumulator;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.class */
public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
    private static final long serialVersionUID = 2324564345203409112L;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumer09.class);
    public static final String KEY_POLL_TIMEOUT = "flink.poll-timeout";
    public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
    public static final long DEFAULT_POLL_TIMEOUT = 100;
    private final Properties properties;
    private final List<KafkaTopicPartition> partitionInfos;
    private final String consumerId;
    private transient List<TopicPartition> subscribedPartitions;
    private transient List<KafkaTopicPartition> subscribedPartitionsAsFlink;
    private transient KafkaConsumer<byte[], byte[]> consumer;
    private transient ConsumerThread<T> consumerThread;
    private transient Throwable consumerThreadException;
    private transient Thread waitThread;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09$ConsumerThread.class */
    public static class ConsumerThread<T> extends Thread {
        private final FlinkKafkaConsumer09<T> flinkKafkaConsumer;
        private final SourceFunction.SourceContext<T> sourceContext;
        private boolean running = true;

        public ConsumerThread(FlinkKafkaConsumer09<T> flinkKafkaConsumer09, SourceFunction.SourceContext<T> sourceContext) {
            this.flinkKafkaConsumer = flinkKafkaConsumer09;
            this.sourceContext = sourceContext;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            ConsumerRecords poll;
            try {
                try {
                    long parseLong = Long.parseLong(((FlinkKafkaConsumer09) this.flinkKafkaConsumer).properties.getProperty(FlinkKafkaConsumer09.KEY_POLL_TIMEOUT, Long.toString(100L)));
                    loop0: while (true) {
                        if (!this.running) {
                            break;
                        }
                        synchronized (((FlinkKafkaConsumer09) this.flinkKafkaConsumer).consumer) {
                            try {
                                poll = ((FlinkKafkaConsumer09) this.flinkKafkaConsumer).consumer.poll(parseLong);
                            } catch (WakeupException e) {
                                if (this.running) {
                                    throw e;
                                }
                            }
                        }
                        for (int i = 0; i < ((FlinkKafkaConsumer09) this.flinkKafkaConsumer).subscribedPartitions.size(); i++) {
                            TopicPartition topicPartition = (TopicPartition) ((FlinkKafkaConsumer09) this.flinkKafkaConsumer).subscribedPartitions.get(i);
                            KafkaTopicPartition kafkaTopicPartition = (KafkaTopicPartition) ((FlinkKafkaConsumer09) this.flinkKafkaConsumer).subscribedPartitionsAsFlink.get(i);
                            List records = poll.records(topicPartition);
                            for (int i2 = 0; i2 < records.size(); i2++) {
                                ConsumerRecord consumerRecord = (ConsumerRecord) records.get(i2);
                                Object deserialize = this.flinkKafkaConsumer.deserializer.deserialize((byte[]) consumerRecord.key(), (byte[]) consumerRecord.value(), consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
                                if (this.flinkKafkaConsumer.deserializer.isEndOfStream(deserialize)) {
                                    this.running = false;
                                    break loop0;
                                }
                                synchronized (this.sourceContext.getCheckpointLock()) {
                                    this.sourceContext.collect(deserialize);
                                    this.flinkKafkaConsumer.offsetsState.put(kafkaTopicPartition, Long.valueOf(consumerRecord.offset()));
                                }
                            }
                        }
                    }
                } catch (Throwable th) {
                    if (this.running) {
                        this.flinkKafkaConsumer.stopWithError(th);
                    } else {
                        FlinkKafkaConsumer09.LOG.debug("Stopped ConsumerThread threw exception", th);
                    }
                    try {
                        ((FlinkKafkaConsumer09) this.flinkKafkaConsumer).consumer.close();
                    } catch (Throwable th2) {
                        FlinkKafkaConsumer09.LOG.warn("Error while closing consumer", th2);
                    }
                }
            } finally {
                try {
                    ((FlinkKafkaConsumer09) this.flinkKafkaConsumer).consumer.close();
                } catch (Throwable th3) {
                    FlinkKafkaConsumer09.LOG.warn("Error while closing consumer", th3);
                }
            }
        }

        public void shutdown() {
            this.running = false;
            ((FlinkKafkaConsumer09) this.flinkKafkaConsumer).consumer.wakeup();
        }
    }

    public FlinkKafkaConsumer09(String str, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), deserializationSchema, properties);
    }

    public FlinkKafkaConsumer09(String str, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        this((List<String>) Collections.singletonList(str), keyedDeserializationSchema, properties);
    }

    public FlinkKafkaConsumer09(List<String> list, DeserializationSchema<T> deserializationSchema, Properties properties) {
        this(list, (KeyedDeserializationSchema) new KeyedDeserializationSchemaWrapper(deserializationSchema), properties);
    }

    public FlinkKafkaConsumer09(List<String> list, KeyedDeserializationSchema<T> keyedDeserializationSchema, Properties properties) {
        super(keyedDeserializationSchema, properties);
        Objects.requireNonNull(list, "topics");
        this.properties = (Properties) Objects.requireNonNull(properties, "props");
        setDeserializer(this.properties);
        KafkaConsumer kafkaConsumer = null;
        try {
            kafkaConsumer = new KafkaConsumer(this.properties);
            this.partitionInfos = new ArrayList();
            for (String str : list) {
                List list2 = null;
                for (int i = 0; i < 10; i++) {
                    LOG.info("Trying to get partitions for topic {}", str);
                    try {
                        list2 = kafkaConsumer.partitionsFor(str);
                    } catch (NullPointerException e) {
                    }
                    if (list2 != null && list2.size() > 0) {
                        break;
                    }
                    kafkaConsumer.close();
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e2) {
                    }
                    kafkaConsumer = new KafkaConsumer(this.properties);
                }
                if (list2 != null) {
                    this.partitionInfos.addAll(convertToFlinkKafkaTopicPartition(list2));
                }
            }
            if (this.partitionInfos.isEmpty()) {
                throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + list);
            }
            LOG.info("Got {} partitions from these topics: {}", Integer.valueOf(this.partitionInfos.size()), list);
            if (LOG.isInfoEnabled()) {
                logPartitionInfo(this.partitionInfos);
            }
            this.consumerId = UUID.randomUUID().toString();
        } finally {
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
        }
    }

    public static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> list) {
        Objects.requireNonNull(list, "The given list of partitions was null");
        ArrayList arrayList = new ArrayList(list.size());
        for (PartitionInfo partitionInfo : list) {
            arrayList.add(new KafkaTopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        return arrayList;
    }

    public static List<TopicPartition> convertToKafkaTopicPartition(List<KafkaTopicPartition> list) {
        ArrayList arrayList = new ArrayList(list.size());
        for (KafkaTopicPartition kafkaTopicPartition : list) {
            arrayList.add(new TopicPartition(kafkaTopicPartition.getTopic(), kafkaTopicPartition.getPartition()));
        }
        return arrayList;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.subscribedPartitionsAsFlink = assignPartitions(this.partitionInfos, getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask());
        if (this.subscribedPartitionsAsFlink.isEmpty()) {
            LOG.info("This consumer doesn't have any partitions assigned");
            this.offsetsState = null;
            return;
        }
        this.properties.setProperty("enable.auto.commit", Boolean.toString(!getRuntimeContext().isCheckpointingEnabled()));
        this.consumer = new KafkaConsumer<>(this.properties);
        this.subscribedPartitions = convertToKafkaTopicPartition(this.subscribedPartitionsAsFlink);
        this.consumer.assign(this.subscribedPartitions);
        if (!Boolean.valueOf(this.properties.getProperty(KEY_DISABLE_METRICS, "false")).booleanValue()) {
            Map metrics = this.consumer.metrics();
            if (metrics == null) {
                LOG.info("Consumer implementation does not support metrics");
            } else {
                for (Map.Entry entry : metrics.entrySet()) {
                    String str = this.consumerId + "-consumer-" + ((MetricName) entry.getKey()).name();
                    DefaultKafkaMetricAccumulator createFor = DefaultKafkaMetricAccumulator.createFor((Metric) entry.getValue());
                    if (createFor != null) {
                        getRuntimeContext().addAccumulator(str, createFor);
                    }
                }
            }
        }
        if (this.restoreToOffset == null) {
            this.offsetsState = new HashMap();
            return;
        }
        for (Map.Entry entry2 : this.restoreToOffset.entrySet()) {
            this.consumer.seek(new TopicPartition(((KafkaTopicPartition) entry2.getKey()).getTopic(), ((KafkaTopicPartition) entry2.getKey()).getPartition()), ((Long) entry2.getValue()).longValue() + 1);
        }
        this.offsetsState = this.restoreToOffset;
    }

    public void run(SourceFunction.SourceContext<T> sourceContext) throws Exception {
        if (this.consumer != null) {
            this.consumerThread = new ConsumerThread<>(this, sourceContext);
            this.consumerThread.setDaemon(true);
            this.consumerThread.start();
            while (this.consumerThread.isAlive()) {
                if (this.consumerThreadException != null) {
                    throw new RuntimeException("ConsumerThread threw an exception: " + this.consumerThreadException.getMessage(), this.consumerThreadException);
                }
                try {
                    this.consumerThread.join(50L);
                } catch (InterruptedException e) {
                    this.consumerThread.shutdown();
                }
            }
            if (this.consumerThreadException != null) {
                throw new RuntimeException("ConsumerThread threw an exception: " + this.consumerThreadException.getMessage(), this.consumerThreadException);
            }
        } else {
            sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
            Object obj = new Object();
            this.waitThread = Thread.currentThread();
            while (this.running) {
                try {
                    synchronized (obj) {
                        obj.wait();
                    }
                } catch (InterruptedException e2) {
                }
            }
        }
        sourceContext.close();
    }

    public void cancel() {
        this.running = false;
        if (this.consumerThread != null) {
            this.consumerThread.shutdown();
        } else if (this.waitThread != null) {
            this.waitThread.interrupt();
        }
    }

    public void close() throws Exception {
        cancel();
        super.close();
    }

    protected void commitOffsets(HashMap<KafkaTopicPartition, Long> hashMap) {
        Map<TopicPartition, OffsetAndMetadata> convertToCommitMap = convertToCommitMap(hashMap);
        synchronized (this.consumer) {
            this.consumer.commitSync(convertToCommitMap);
        }
    }

    public static Map<TopicPartition, OffsetAndMetadata> convertToCommitMap(HashMap<KafkaTopicPartition, Long> hashMap) {
        HashMap hashMap2 = new HashMap(hashMap.size());
        for (Map.Entry<KafkaTopicPartition, Long> entry : hashMap.entrySet()) {
            hashMap2.put(new TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()), new OffsetAndMetadata(entry.getValue().longValue(), ""));
        }
        return hashMap2;
    }

    protected static void setDeserializer(Properties properties) {
        if (properties.contains("key.deserializer")) {
            LOG.warn("Overwriting the '{}' is not recommended", "key.deserializer");
        } else {
            properties.put("key.deserializer", ByteArrayDeserializer.class.getCanonicalName());
        }
        if (properties.contains("value.deserializer")) {
            LOG.warn("Overwriting the '{}' is not recommended", "value.deserializer");
        } else {
            properties.put("value.deserializer", ByteArrayDeserializer.class.getCanonicalName());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stopWithError(Throwable th) {
        this.consumerThreadException = th;
    }
}
