package org.apache.flink.streaming.connectors.kafka.shuffle;

import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaException;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.PropertiesUtil;
import org.apache.kafka.clients.producer.ProducerRecord;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer.class */
public class FlinkKafkaShuffleProducer<IN, KEY> extends FlinkKafkaProducer<IN> {
    private final KafkaSerializer<IN> kafkaSerializer;
    private final KeySelector<IN, KEY> keySelector;
    private final int numberOfPartitions;
    private final Map<Integer, Integer> subtaskToPartitionMap;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer$KafkaSerializer.class */
    public static final class KafkaSerializer<IN> implements Serializable {
        public static final int TAG_REC_WITH_TIMESTAMP = 0;
        public static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
        public static final int TAG_WATERMARK = 2;
        private static final long serialVersionUID = 2000002;
        private static final int KAFKA_SHUFFLE_VERSION = 0;
        private final TypeSerializer<IN> serializer;
        private transient DataOutputSerializer dos;

        KafkaSerializer(TypeSerializer<IN> typeSerializer) {
            this.serializer = typeSerializer;
        }

        byte[] serializeRecord(IN in, Long l) {
            if (this.dos == null) {
                this.dos = new DataOutputSerializer(16);
            }
            try {
                this.dos.write(0);
                if (l == null) {
                    this.dos.write(1);
                } else {
                    this.dos.write(0);
                    this.dos.writeLong(l.longValue());
                }
                this.serializer.serialize(in, this.dos);
                byte[] copyOfBuffer = this.dos.getCopyOfBuffer();
                this.dos.clear();
                return copyOfBuffer;
            } catch (IOException e) {
                throw new RuntimeException("Unable to serialize record", e);
            }
        }

        byte[] serializeWatermark(Watermark watermark, int i) {
            if (this.dos == null) {
                this.dos = new DataOutputSerializer(16);
            }
            try {
                this.dos.write(0);
                this.dos.write(2);
                this.dos.writeInt(i);
                this.dos.writeLong(watermark.getTimestamp());
                byte[] copyOfBuffer = this.dos.getCopyOfBuffer();
                this.dos.clear();
                return copyOfBuffer;
            } catch (IOException e) {
                throw new RuntimeException("Unable to serialize watermark", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlinkKafkaShuffleProducer(String str, TypeSerializer<IN> typeSerializer, Properties properties, KeySelector<IN, KEY> keySelector, FlinkKafkaProducer.Semantic semantic, int i) {
        super(str, (obj, l) -> {
            return null;
        }, properties, semantic, i);
        this.kafkaSerializer = new KafkaSerializer<>(typeSerializer);
        this.keySelector = keySelector;
        Preconditions.checkArgument(properties.getProperty("partition number") != null, "Missing partition number for Kafka Shuffle");
        this.numberOfPartitions = PropertiesUtil.getInt(properties, "partition number", Integer.MIN_VALUE);
        this.subtaskToPartitionMap = new HashMap();
    }

    @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    public void invoke(FlinkKafkaProducer.KafkaTransactionState kafkaTransactionState, IN in, SinkFunction.Context context) throws FlinkKafkaException {
        checkErroneous();
        Long timestamp = context.timestamp();
        int[] partitions = getPartitions(kafkaTransactionState);
        try {
            ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(this.defaultTopicId, Integer.valueOf(this.subtaskToPartitionMap.get(Integer.valueOf(KeyGroupRangeAssignment.assignKeyToParallelOperator(this.keySelector.getKey(in), partitions.length, partitions.length))).intValue()), timestamp, (Object) null, this.kafkaSerializer.serializeRecord(in, timestamp));
            this.pendingRecords.incrementAndGet();
            kafkaTransactionState.getProducer().send(producerRecord, this.callback);
        } catch (Exception e) {
            throw new RuntimeException("Fail to assign a partition number to record", e);
        }
    }

    public void invoke(Watermark watermark) throws FlinkKafkaException {
        checkErroneous();
        FlinkKafkaProducer.KafkaTransactionState kafkaTransactionState = (FlinkKafkaProducer.KafkaTransactionState) currentTransaction();
        int[] partitions = getPartitions(kafkaTransactionState);
        int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
        long timestamp = watermark.getTimestamp();
        for (int i : partitions) {
            ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(this.defaultTopicId, Integer.valueOf(i), Long.valueOf(timestamp), (Object) null, this.kafkaSerializer.serializeWatermark(watermark, indexOfThisSubtask));
            this.pendingRecords.incrementAndGet();
            kafkaTransactionState.getProducer().send(producerRecord, this.callback);
        }
    }

    private int[] getPartitions(FlinkKafkaProducer.KafkaTransactionState kafkaTransactionState) {
        int[] iArr = this.topicPartitionsMap.get(this.defaultTopicId);
        if (iArr == null) {
            iArr = getPartitionsByTopic(this.defaultTopicId, kafkaTransactionState.getProducer());
            this.topicPartitionsMap.put(this.defaultTopicId, iArr);
            for (int i = 0; i < iArr.length; i++) {
                this.subtaskToPartitionMap.put(Integer.valueOf(KafkaTopicPartitionAssigner.assign(this.defaultTopicId, iArr[i], iArr.length)), Integer.valueOf(iArr[i]));
            }
        }
        Preconditions.checkArgument(iArr.length == this.numberOfPartitions);
        return iArr;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
    public /* bridge */ /* synthetic */ void invoke(Object obj, Object obj2, SinkFunction.Context context) throws Exception {
        invoke((FlinkKafkaProducer.KafkaTransactionState) obj, (FlinkKafkaProducer.KafkaTransactionState) obj2, context);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 929010663:
                if (implMethodName.equals("lambda$new$2195325$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case KafkaSerializer.TAG_REC_WITH_TIMESTAMP /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema") && serializedLambda.getFunctionalInterfaceMethodName().equals("serialize") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/kafka/clients/producer/ProducerRecord;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/connectors/kafka/shuffle/FlinkKafkaShuffleProducer") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/kafka/clients/producer/ProducerRecord;")) {
                    return (obj, l) -> {
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
