package org.apache.flink.streaming.connectors.kafka.testutils;

import java.util.Properties;
import java.util.Random;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.class */
public class DataGenerators {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators$InfiniteStringsGenerator.class */
    public static class InfiniteStringsGenerator extends Thread {
        private final KafkaTestEnvironment server;
        private final String topic;
        private volatile Throwable error;
        private volatile boolean running = true;

        public InfiniteStringsGenerator(KafkaTestEnvironment kafkaTestEnvironment, String str) {
            this.server = kafkaTestEnvironment;
            this.topic = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            FlinkKafkaProducerBase flinkKafkaProducerBase = null;
            try {
                try {
                    Properties propertiesFromBrokerList = FlinkKafkaProducerBase.getPropertiesFromBrokerList(this.server.getBrokerConnectionString());
                    propertiesFromBrokerList.setProperty("retries", "3");
                    flinkKafkaProducerBase = this.server.getProducer(this.topic, new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), propertiesFromBrokerList, new FixedPartitioner());
                    flinkKafkaProducerBase.setRuntimeContext(new MockRuntimeContext(1, 0));
                    flinkKafkaProducerBase.open(new Configuration());
                    StringBuilder sb = new StringBuilder();
                    Random random = new Random();
                    while (this.running) {
                        sb.setLength(0);
                        int nextInt = random.nextInt(100) + 1;
                        for (int i = 0; i < nextInt; i++) {
                            sb.append((char) (random.nextInt(20) + 97));
                        }
                        flinkKafkaProducerBase.invoke(sb.toString());
                    }
                    if (flinkKafkaProducerBase != null) {
                        try {
                            flinkKafkaProducerBase.close();
                        } catch (Throwable th) {
                        }
                    }
                } catch (Throwable th2) {
                    if (flinkKafkaProducerBase != null) {
                        try {
                            flinkKafkaProducerBase.close();
                        } catch (Throwable th3) {
                        }
                    }
                    throw th2;
                }
            } catch (Throwable th4) {
                this.error = th4;
                if (flinkKafkaProducerBase != null) {
                    try {
                        flinkKafkaProducerBase.close();
                    } catch (Throwable th5) {
                    }
                }
            }
        }

        public void shutdown() {
            this.running = false;
            interrupt();
        }

        public Throwable getError() {
            return this.error;
        }
    }

    public static void generateLongStringTupleSequence(StreamExecutionEnvironment streamExecutionEnvironment, KafkaTestEnvironment kafkaTestEnvironment, String str, int i, final int i2, final int i3) throws Exception {
        TypeInformation parse = TypeInfoParser.parse("Tuple2<Integer, Integer>");
        streamExecutionEnvironment.setParallelism(i);
        streamExecutionEnvironment.getConfig().disableSysoutLogging();
        streamExecutionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        streamExecutionEnvironment.addSource(new RichParallelSourceFunction<Tuple2<Integer, Integer>>() { // from class: org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.1
            private volatile boolean running = true;

            public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> sourceContext) throws Exception {
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                for (int i4 = i2; this.running && i4 <= i3; i4++) {
                    sourceContext.collect(new Tuple2(Integer.valueOf(indexOfThisSubtask), Integer.valueOf(i4)));
                }
            }

            public void cancel() {
                this.running = false;
            }
        }).addSink(kafkaTestEnvironment.getProducer(str, new KeyedSerializationSchemaWrapper(new TypeInformationSerializationSchema(parse, streamExecutionEnvironment.getConfig())), FlinkKafkaProducerBase.getPropertiesFromBrokerList(kafkaTestEnvironment.getBrokerConnectionString()), new Tuple2Partitioner(i)));
        streamExecutionEnvironment.execute("Data generator (Int, Int) stream to topic " + str);
    }

    public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment streamExecutionEnvironment, KafkaTestEnvironment kafkaTestEnvironment, String str, int i, final int i2, final boolean z) throws Exception {
        streamExecutionEnvironment.setParallelism(i);
        streamExecutionEnvironment.getConfig().disableSysoutLogging();
        streamExecutionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        streamExecutionEnvironment.addSource(new RichParallelSourceFunction<Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.2
            private volatile boolean running = true;

            public void run(SourceFunction.SourceContext<Integer> sourceContext) {
                int[] iArr = new int[i2];
                int i3 = 0;
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                while (true) {
                    int i4 = indexOfThisSubtask;
                    if (i3 >= i2) {
                        break;
                    }
                    iArr[i3] = i4;
                    i3++;
                    indexOfThisSubtask = i4 + getRuntimeContext().getNumberOfParallelSubtasks();
                }
                if (z) {
                    Random random = new Random();
                    for (int i5 = 0; i5 < iArr.length; i5++) {
                        int nextInt = random.nextInt(iArr.length);
                        int i6 = iArr[i5];
                        iArr[i5] = iArr[nextInt];
                        iArr[nextInt] = i6;
                    }
                }
                int i7 = 0;
                while (this.running && i7 < iArr.length) {
                    int i8 = i7;
                    i7++;
                    sourceContext.collect(Integer.valueOf(iArr[i8]));
                }
            }

            public void cancel() {
                this.running = false;
            }
        }).rebalance().addSink(kafkaTestEnvironment.getProducer(str, new KeyedSerializationSchemaWrapper(new TypeInformationSerializationSchema(BasicTypeInfo.INT_TYPE_INFO, streamExecutionEnvironment.getConfig())), FlinkKafkaProducerBase.getPropertiesFromBrokerList(kafkaTestEnvironment.getBrokerConnectionString()), new KafkaPartitioner<Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.3
            public int partition(Integer num, byte[] bArr, byte[] bArr2, int i3) {
                return num.intValue() % i3;
            }
        }));
        streamExecutionEnvironment.execute("Scrambles int sequence generator");
    }
}
