package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.class */
public abstract class KafkaProducerTestBase extends KafkaTestBase {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$CustomKeyedSerializationSchemaWrapper.class */
    public static class CustomKeyedSerializationSchemaWrapper extends KeyedSerializationSchemaWrapper<Tuple2<Long, String>> {
        private final String defaultTopic;
        private final String dynamicTopic;

        public CustomKeyedSerializationSchemaWrapper(SerializationSchema<Tuple2<Long, String>> serializationSchema, String str, String str2) {
            super(serializationSchema);
            this.defaultTopic = (String) Preconditions.checkNotNull(str);
            this.dynamicTopic = (String) Preconditions.checkNotNull(str2);
        }

        public String getTargetTopic(Tuple2<Long, String> tuple2) {
            return ((Long) tuple2.f0).longValue() % 2 == 0 ? this.defaultTopic : this.dynamicTopic;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$CustomPartitioner.class */
    public static class CustomPartitioner extends FlinkKafkaPartitioner<Tuple2<Long, String>> implements Serializable {
        private final Map<String, Integer> expectedTopicsToNumPartitions;

        public CustomPartitioner(Map<String, Integer> map) {
            this.expectedTopicsToNumPartitions = map;
        }

        public int partition(Tuple2<Long, String> tuple2, byte[] bArr, byte[] bArr2, String str, int[] iArr) {
            Assert.assertEquals(this.expectedTopicsToNumPartitions.get(str).intValue(), iArr.length);
            return (int) (((Long) tuple2.f0).longValue() % iArr.length);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$PartitionValidatingMapper.class */
    public static class PartitionValidatingMapper extends RichMapFunction<Tuple2<Long, String>, Integer> {
        private final int numPartitions;
        private int ourPartition = -1;

        public PartitionValidatingMapper(int i) {
            this.numPartitions = i;
        }

        public Integer map(Tuple2<Long, String> tuple2) throws Exception {
            int intValue = ((Long) tuple2.f0).intValue() % this.numPartitions;
            if (this.ourPartition != -1) {
                Assert.assertEquals("inconsistent partitioning", this.ourPartition, intValue);
            } else {
                this.ourPartition = intValue;
            }
            return Integer.valueOf(intValue);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$PartitionValidatingSink.class */
    public static class PartitionValidatingSink implements SinkFunction<Integer> {
        private final int[] valuesPerPartition;

        public PartitionValidatingSink(int i) {
            this.valuesPerPartition = new int[i];
        }

        public void invoke(Integer num) throws Exception {
            int[] iArr = this.valuesPerPartition;
            int intValue = num.intValue();
            iArr[intValue] = iArr[intValue] + 1;
            boolean z = false;
            int[] iArr2 = this.valuesPerPartition;
            int length = iArr2.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                if (iArr2[i] < 100) {
                    z = true;
                    break;
                }
                i++;
            }
            if (!z) {
                throw new SuccessException();
            }
        }
    }

    public void runCustomPartitioningTest() {
        try {
            LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
            createTestTopic("defaultTopic", 2, 1);
            createTestTopic("dynamicTopic", 3, 1);
            HashMap hashMap = new HashMap(2);
            hashMap.put("defaultTopic", 2);
            hashMap.put("dynamicTopic", 3);
            TypeInformation parse = TypeInfoParser.parse("Tuple2<Long, String>");
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            executionEnvironment.getConfig().disableSysoutLogging();
            TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(parse, executionEnvironment.getConfig());
            TypeInformationSerializationSchema typeInformationSerializationSchema2 = new TypeInformationSerializationSchema(parse, executionEnvironment.getConfig());
            DataStreamSource parallelism = executionEnvironment.addSource(new SourceFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.1
                private boolean running = true;

                public void run(SourceFunction.SourceContext<Tuple2<Long, String>> sourceContext) throws Exception {
                    long j = 0;
                    while (true) {
                        long j2 = j;
                        if (!this.running) {
                            return;
                        }
                        sourceContext.collect(new Tuple2(Long.valueOf(j2), "kafka-" + j2));
                        j = j2 + 1;
                    }
                }

                public void cancel() {
                    this.running = false;
                }
            }).setParallelism(1);
            Properties properties = new Properties();
            properties.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings));
            properties.putAll(secureProps);
            kafkaServer.produceIntoKafka(parallelism, "defaultTopic", new CustomKeyedSerializationSchemaWrapper(typeInformationSerializationSchema, "defaultTopic", "dynamicTopic"), properties, new CustomPartitioner(hashMap)).setParallelism(Math.max(2, 3));
            Properties properties2 = new Properties();
            properties2.putAll(standardProps);
            properties2.putAll(secureProps);
            FlinkKafkaConsumerBase consumer = kafkaServer.getConsumer("defaultTopic", (DeserializationSchema) typeInformationSerializationSchema2, properties2);
            FlinkKafkaConsumerBase consumer2 = kafkaServer.getConsumer("dynamicTopic", (DeserializationSchema) typeInformationSerializationSchema2, properties2);
            executionEnvironment.addSource(consumer).setParallelism(2).map(new PartitionValidatingMapper(2)).setParallelism(2).addSink(new PartitionValidatingSink(2)).setParallelism(1);
            executionEnvironment.addSource(consumer2).setParallelism(3).map(new PartitionValidatingMapper(3)).setParallelism(3).addSink(new PartitionValidatingSink(3)).setParallelism(1);
            TestUtils.tryExecute(executionEnvironment, "custom partitioning test");
            deleteTestTopic("defaultTopic");
            deleteTestTopic("dynamicTopic");
            LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
