package org.apache.flink.streaming.connectors.kafka;

import java.io.Serializable;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.DeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.junit.Assert;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.class */
public abstract class KafkaProducerTestBase extends KafkaTestBase {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase$CustomPartitioner.class */
    public static class CustomPartitioner extends KafkaPartitioner implements Serializable {
        private final int expectedPartitions;

        public CustomPartitioner(int i) {
            this.expectedPartitions = i;
        }

        public int partition(Object obj, byte[] bArr, byte[] bArr2, int i) {
            Assert.assertEquals(this.expectedPartitions, i);
            return (int) (((Long) ((Tuple2) obj).f0).longValue() % i);
        }
    }

    public void runCustomPartitioningTest() {
        try {
            LOG.info("Starting KafkaProducerITCase.testCustomPartitioning()");
            createTestTopic("customPartitioningTestTopic", 3, 1);
            TypeInformation parse = TypeInfoParser.parse("Tuple2<Long, String>");
            StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", flinkPort, new String[0]);
            createRemoteEnvironment.setRestartStrategy(RestartStrategies.noRestart());
            createRemoteEnvironment.getConfig().disableSysoutLogging();
            TypeInformationSerializationSchema typeInformationSerializationSchema = new TypeInformationSerializationSchema(parse, createRemoteEnvironment.getConfig());
            TypeInformationSerializationSchema typeInformationSerializationSchema2 = new TypeInformationSerializationSchema(parse, createRemoteEnvironment.getConfig());
            createRemoteEnvironment.addSource(new SourceFunction<Tuple2<Long, String>>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.1
                private boolean running = true;

                public void run(SourceFunction.SourceContext<Tuple2<Long, String>> sourceContext) throws Exception {
                    long j = 0;
                    while (true) {
                        long j2 = j;
                        if (!this.running) {
                            return;
                        }
                        sourceContext.collect(new Tuple2(Long.valueOf(j2), "kafka-" + j2));
                        j = j2 + 1;
                    }
                }

                public void cancel() {
                    this.running = false;
                }
            }).setParallelism(1).addSink(kafkaServer.getProducer("customPartitioningTestTopic", new KeyedSerializationSchemaWrapper(typeInformationSerializationSchema), FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings), new CustomPartitioner(3))).setParallelism(3);
            createRemoteEnvironment.addSource(kafkaServer.getConsumer("customPartitioningTestTopic", (DeserializationSchema) typeInformationSerializationSchema2, standardProps)).setParallelism(3).map(new RichMapFunction<Tuple2<Long, String>, Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.3
                private int ourPartition = -1;

                public Integer map(Tuple2<Long, String> tuple2) {
                    int intValue = ((Long) tuple2.f0).intValue() % 3;
                    if (this.ourPartition != -1) {
                        Assert.assertEquals("inconsistent partitioning", this.ourPartition, intValue);
                    } else {
                        this.ourPartition = intValue;
                    }
                    return Integer.valueOf(intValue);
                }
            }).setParallelism(3).addSink(new SinkFunction<Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.KafkaProducerTestBase.2
                private int[] valuesPerPartition = new int[3];

                public void invoke(Integer num) throws Exception {
                    int[] iArr = this.valuesPerPartition;
                    int intValue = num.intValue();
                    iArr[intValue] = iArr[intValue] + 1;
                    boolean z = false;
                    int[] iArr2 = this.valuesPerPartition;
                    int length = iArr2.length;
                    int i = 0;
                    while (true) {
                        if (i >= length) {
                            break;
                        }
                        if (iArr2[i] < 100) {
                            z = true;
                            break;
                        }
                        i++;
                    }
                    if (!z) {
                        throw new SuccessException();
                    }
                }
            }).setParallelism(1);
            TestUtils.tryExecute(createRemoteEnvironment, "custom partitioning test");
            deleteTestTopic("customPartitioningTestTopic");
            LOG.info("Finished KafkaProducerITCase.testCustomPartitioning()");
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }
}
