package org.apache.flink.streaming.connectors.kafka.testutils;

import java.util.Collection;
import java.util.Properties;
import java.util.Random;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.class */
public class DataGenerators {

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators$InfiniteStringsGenerator.class */
    public static class InfiniteStringsGenerator extends Thread {
        private final KafkaTestEnvironment server;
        private final String topic;
        private volatile Throwable error;
        private volatile boolean running = true;

        /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators$InfiniteStringsGenerator$DummyStreamExecutionEnvironment.class */
        private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
            private DummyStreamExecutionEnvironment() {
            }

            public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
                return null;
            }
        }

        /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators$InfiniteStringsGenerator$MockTransformation.class */
        private static class MockTransformation extends Transformation<String> {
            public MockTransformation() {
                super("MockTransform", BasicTypeInfo.STRING_TYPE_INFO, 1);
            }

            public Collection<Transformation<?>> getTransitivePredecessors() {
                return null;
            }
        }

        public InfiniteStringsGenerator(KafkaTestEnvironment kafkaTestEnvironment, String str) {
            this.server = kafkaTestEnvironment;
            this.topic = str;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            RichFunction richFunction = null;
            try {
                try {
                    Properties propertiesFromBrokerList = FlinkKafkaProducerBase.getPropertiesFromBrokerList(this.server.getBrokerConnectionString());
                    propertiesFromBrokerList.setProperty("retries", "3");
                    new DataStream(new DummyStreamExecutionEnvironment(), new MockTransformation());
                    OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(this.server.getProducerSink(this.topic, new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), propertiesFromBrokerList, new FlinkFixedPartitioner()));
                    oneInputStreamOperatorTestHarness.open();
                    StringBuilder sb = new StringBuilder();
                    Random random = new Random();
                    while (this.running) {
                        sb.setLength(0);
                        int nextInt = random.nextInt(100) + 1;
                        for (int i = 0; i < nextInt; i++) {
                            sb.append((char) (random.nextInt(20) + 97));
                        }
                        oneInputStreamOperatorTestHarness.processElement(new StreamRecord(sb.toString()));
                    }
                    if (0 != 0) {
                        try {
                            richFunction.close();
                        } catch (Throwable th) {
                        }
                    }
                } catch (Throwable th2) {
                    this.error = th2;
                    if (0 != 0) {
                        try {
                            richFunction.close();
                        } catch (Throwable th3) {
                        }
                    }
                }
            } catch (Throwable th4) {
                if (0 != 0) {
                    try {
                        richFunction.close();
                    } catch (Throwable th5) {
                    }
                }
                throw th4;
            }
        }

        public void shutdown() {
            this.running = false;
            interrupt();
        }

        public Throwable getError() {
            return this.error;
        }
    }

    public static void generateRandomizedIntegerSequence(StreamExecutionEnvironment streamExecutionEnvironment, KafkaTestEnvironment kafkaTestEnvironment, String str, int i, final int i2, final boolean z) throws Exception {
        streamExecutionEnvironment.setParallelism(i);
        streamExecutionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        DataStreamSource addSource = streamExecutionEnvironment.addSource(new RichParallelSourceFunction<Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.1
            private volatile boolean running = true;

            public void run(SourceFunction.SourceContext<Integer> sourceContext) {
                int[] iArr = new int[i2];
                int i3 = 0;
                int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                while (true) {
                    int i4 = indexOfThisSubtask;
                    if (i3 >= i2) {
                        break;
                    }
                    iArr[i3] = i4;
                    i3++;
                    indexOfThisSubtask = i4 + getRuntimeContext().getNumberOfParallelSubtasks();
                }
                if (z) {
                    Random random = new Random();
                    for (int i5 = 0; i5 < iArr.length; i5++) {
                        int nextInt = random.nextInt(iArr.length);
                        int i6 = iArr[i5];
                        iArr[i5] = iArr[nextInt];
                        iArr[nextInt] = i6;
                    }
                }
                int i7 = 0;
                while (this.running && i7 < iArr.length) {
                    int i8 = i7;
                    i7++;
                    sourceContext.collect(Integer.valueOf(iArr[i8]));
                }
            }

            public void cancel() {
                this.running = false;
            }
        });
        Properties properties = new Properties();
        properties.putAll(FlinkKafkaProducerBase.getPropertiesFromBrokerList(kafkaTestEnvironment.getBrokerConnectionString()));
        if (kafkaTestEnvironment.getSecureProperties() != null) {
            properties.putAll(kafkaTestEnvironment.getSecureProperties());
        }
        properties.putAll(kafkaTestEnvironment.getIdempotentProducerConfig());
        kafkaTestEnvironment.produceIntoKafka(addSource.rebalance(), str, new KeyedSerializationSchemaWrapper(new TypeInformationSerializationSchema(BasicTypeInfo.INT_TYPE_INFO, streamExecutionEnvironment.getConfig())), properties, new FlinkKafkaPartitioner<Integer>() { // from class: org.apache.flink.streaming.connectors.kafka.testutils.DataGenerators.2
            public int partition(Integer num, byte[] bArr, byte[] bArr2, String str2, int[] iArr) {
                return num.intValue() % iArr.length;
            }
        });
        streamExecutionEnvironment.execute("Scrambles int sequence generator");
    }
}
