package org.apache.flink.streaming.connectors.kafka.internals;

import java.io.Serializable;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ByteSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher.class */
public class KafkaShuffleFetcher<T> extends KafkaFetcher<T> {
    private final WatermarkHandler watermarkHandler;
    private final KafkaShuffleElementDeserializer kafkaShuffleDeserializer;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher$KafkaShuffleElement.class */
    public static abstract class KafkaShuffleElement {
        public boolean isRecord() {
            return getClass() == KafkaShuffleRecord.class;
        }

        public boolean isWatermark() {
            return getClass() == KafkaShuffleWatermark.class;
        }

        public <T> KafkaShuffleRecord<T> asRecord() {
            return (KafkaShuffleRecord) this;
        }

        public KafkaShuffleWatermark asWatermark() {
            return (KafkaShuffleWatermark) this;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher$KafkaShuffleElementDeserializer.class */
    public static class KafkaShuffleElementDeserializer<T> implements Serializable {
        private static final long serialVersionUID = 1000001;
        private final TypeSerializer<T> typeSerializer;
        private transient DataInputDeserializer dis;

        @VisibleForTesting
        public KafkaShuffleElementDeserializer(TypeSerializer<T> typeSerializer) {
            this.typeSerializer = typeSerializer;
        }

        @VisibleForTesting
        public KafkaShuffleElement deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
            byte[] value = consumerRecord.value();
            if (this.dis != null) {
                this.dis.setBuffer(value);
            } else {
                this.dis = new DataInputDeserializer(value);
            }
            ByteSerializer.INSTANCE.deserialize(this.dis);
            byte byteValue = ByteSerializer.INSTANCE.deserialize(this.dis).byteValue();
            if (byteValue == 1) {
                return new KafkaShuffleRecord(this.typeSerializer.deserialize(this.dis));
            }
            if (byteValue == 0) {
                return new KafkaShuffleRecord(LongSerializer.INSTANCE.deserialize(this.dis).longValue(), this.typeSerializer.deserialize(this.dis));
            }
            if (byteValue == 2) {
                return new KafkaShuffleWatermark(IntSerializer.INSTANCE.deserialize(this.dis).intValue(), LongSerializer.INSTANCE.deserialize(this.dis).longValue());
            }
            throw new UnsupportedOperationException("Unsupported tag format");
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher$KafkaShuffleRecord.class */
    public static class KafkaShuffleRecord<T> extends KafkaShuffleElement {
        final T value;
        final Long timestamp;

        KafkaShuffleRecord(T t) {
            this.value = t;
            this.timestamp = null;
        }

        KafkaShuffleRecord(long j, T t) {
            this.value = t;
            this.timestamp = Long.valueOf(j);
        }

        public T getValue() {
            return this.value;
        }

        public Long getTimestamp() {
            return this.timestamp;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher$KafkaShuffleWatermark.class */
    public static class KafkaShuffleWatermark extends KafkaShuffleElement {
        final int subtask;
        final long watermark;

        KafkaShuffleWatermark(int i, long j) {
            this.subtask = i;
            this.watermark = j;
        }

        public int getSubtask() {
            return this.subtask;
        }

        public long getWatermark() {
            return this.watermark;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/internals/KafkaShuffleFetcher$WatermarkHandler.class */
    private static class WatermarkHandler {
        private final int producerParallelism;
        private final Map<Integer, Long> subtaskWatermark;
        private long currentMinWatermark = Long.MIN_VALUE;

        WatermarkHandler(int i) {
            this.producerParallelism = i;
            this.subtaskWatermark = new HashMap(i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Optional<Watermark> checkAndGetNewWatermark(KafkaShuffleWatermark kafkaShuffleWatermark) {
            Long l = this.subtaskWatermark.get(Integer.valueOf(kafkaShuffleWatermark.subtask));
            Preconditions.checkState(l == null || l.longValue() < kafkaShuffleWatermark.watermark, "Watermark should always increase: current : new " + l + ":" + kafkaShuffleWatermark.watermark);
            this.subtaskWatermark.put(Integer.valueOf(kafkaShuffleWatermark.subtask), Long.valueOf(kafkaShuffleWatermark.watermark));
            if (this.subtaskWatermark.values().size() < this.producerParallelism) {
                return Optional.empty();
            }
            long longValue = this.subtaskWatermark.values().stream().min(Comparator.naturalOrder()).orElse(Long.MIN_VALUE).longValue();
            if (this.currentMinWatermark >= longValue) {
                return Optional.empty();
            }
            this.currentMinWatermark = longValue;
            return Optional.of(new Watermark(longValue));
        }
    }

    public KafkaShuffleFetcher(SourceFunction.SourceContext<T> sourceContext, Map<KafkaTopicPartition, Long> map, SerializedValue<WatermarkStrategy<T>> serializedValue, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, String str, KafkaDeserializationSchema<T> kafkaDeserializationSchema, Properties properties, long j2, MetricGroup metricGroup, MetricGroup metricGroup2, boolean z, TypeSerializer<T> typeSerializer, int i) throws Exception {
        super(sourceContext, map, serializedValue, processingTimeService, j, classLoader, str, kafkaDeserializationSchema, properties, j2, metricGroup, metricGroup2, z);
        this.kafkaShuffleDeserializer = new KafkaShuffleElementDeserializer(typeSerializer);
        this.watermarkHandler = new WatermarkHandler(i);
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher
    protected String getFetcherName() {
        return "Kafka Shuffle Fetcher";
    }

    @Override // org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher
    protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> list, KafkaTopicPartitionState<T, TopicPartition> kafkaTopicPartitionState) throws Exception {
        for (ConsumerRecord<byte[], byte[]> consumerRecord : list) {
            KafkaShuffleElement deserialize = this.kafkaShuffleDeserializer.deserialize(consumerRecord);
            if (deserialize.isRecord()) {
                synchronized (this.checkpointLock) {
                    KafkaShuffleRecord<T> asRecord = deserialize.asRecord();
                    this.sourceContext.collectWithTimestamp(asRecord.value, asRecord.timestamp == null ? consumerRecord.timestamp() : asRecord.timestamp.longValue());
                    kafkaTopicPartitionState.setOffset(consumerRecord.offset());
                }
            } else if (deserialize.isWatermark()) {
                Optional checkAndGetNewWatermark = this.watermarkHandler.checkAndGetNewWatermark(deserialize.asWatermark());
                SourceFunction.SourceContext<T> sourceContext = this.sourceContext;
                sourceContext.getClass();
                checkAndGetNewWatermark.ifPresent(sourceContext::emitWatermark);
            }
        }
    }
}
