/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka.internal;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.internal.Handover;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerCallBridge;
import org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Kafka09Fetcher<T>
extends AbstractFetcher<T, TopicPartition> {
    private static final Logger LOG = LoggerFactory.getLogger(Kafka09Fetcher.class);
    private final KeyedDeserializationSchema<T> deserializer;
    private final Handover handover;
    private final KafkaConsumerThread consumerThread;
    private volatile boolean running = true;

    public Kafka09Fetcher(SourceFunction.SourceContext<T> sourceContext, List<KafkaTopicPartition> assignedPartitions, SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic, SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated, ProcessingTimeService processingTimeProvider, long autoWatermarkInterval, ClassLoader userCodeClassLoader, boolean enableCheckpointing, String taskNameWithSubtasks, MetricGroup metricGroup, KeyedDeserializationSchema<T> deserializer, Properties kafkaProperties, long pollTimeout, boolean useMetrics) throws Exception {
        super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, userCodeClassLoader, useMetrics);
        this.deserializer = deserializer;
        this.handover = new Handover();
        MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
        this.addOffsetStateGauge(kafkaMetricGroup);
        kafkaProperties.setProperty("enable.auto.commit", Boolean.toString(!enableCheckpointing));
        this.consumerThread = new KafkaConsumerThread(LOG, this.handover, kafkaProperties, this.subscribedPartitions(), kafkaMetricGroup, this.createCallBridge(), this.getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void runFetchLoop() throws Exception {
        try {
            Handover handover = this.handover;
            this.consumerThread.start();
            while (this.running) {
                ConsumerRecords<byte[], byte[]> records = handover.pollNext();
                block6: for (KafkaTopicPartitionState partition : this.subscribedPartitions()) {
                    List partitionRecords = records.records((TopicPartition)partition.getKafkaPartitionHandle());
                    for (ConsumerRecord record : partitionRecords) {
                        Object value = this.deserializer.deserialize((byte[])record.key(), (byte[])record.value(), record.topic(), record.partition(), record.offset());
                        if (this.deserializer.isEndOfStream(value)) {
                            this.running = false;
                            continue block6;
                        }
                        this.emitRecord(value, (KafkaTopicPartitionState<TopicPartition>)partition, record.offset(), record);
                    }
                }
            }
        }
        finally {
            this.consumerThread.shutdown();
        }
        try {
            this.consumerThread.join();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void cancel() {
        this.running = false;
        this.handover.close();
        this.consumerThread.shutdown();
    }

    protected void emitRecord(T record, KafkaTopicPartitionState<TopicPartition> partition, long offset, ConsumerRecord<?, ?> consumerRecord) throws Exception {
        this.emitRecord(record, partition, offset);
    }

    protected String getFetcherName() {
        return "Kafka 0.9 Fetcher";
    }

    protected KafkaConsumerCallBridge createCallBridge() {
        return new KafkaConsumerCallBridge();
    }

    public TopicPartition createKafkaPartitionHandle(KafkaTopicPartition partition) {
        return new TopicPartition(partition.getTopic(), partition.getPartition());
    }

    public void commitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets) throws Exception {
        KafkaTopicPartitionState[] partitions = this.subscribedPartitions();
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>(partitions.length);
        for (KafkaTopicPartitionState partition : partitions) {
            Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
            if (lastProcessedOffset == null) continue;
            long offsetToCommit = lastProcessedOffset + 1L;
            offsetsToCommit.put((TopicPartition)partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
            partition.setCommittedOffset(offsetToCommit);
        }
        this.consumerThread.setOffsetsToCommit(offsetsToCommit);
    }
}

