package org.apache.flink.streaming.connectors.kinesis.testutils;

import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.AWSConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread.class */
public class KinesisEventsGeneratorProducerThread {
    private static final Logger LOG = LoggerFactory.getLogger(KinesisEventsGeneratorProducerThread.class);

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/KinesisEventsGeneratorProducerThread$EventsGenerator.class */
    private static class EventsGenerator implements SourceFunction<String> {
        private static final Logger LOG = LoggerFactory.getLogger(EventsGenerator.class);
        private boolean running = true;
        private final long limit;

        public EventsGenerator(long j) {
            this.limit = j;
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            long j = 0;
            while (this.running) {
                Thread.sleep(10L);
                long j2 = j;
                j = j2 + 1;
                String str = j2 + "-" + RandomStringUtils.randomAlphabetic(12);
                sourceContext.collect(str);
                LOG.info("Emitting event {}", str);
                if (j >= this.limit) {
                    break;
                }
            }
            sourceContext.close();
            LOG.info("Stopping events generator");
        }

        public void cancel() {
            this.running = false;
        }
    }

    public static Thread create(final int i, final int i2, final String str, final String str2, final String str3, final String str4, final AtomicReference<Throwable> atomicReference, final int i3, final Configuration configuration) {
        return new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.kinesis.testutils.KinesisEventsGeneratorProducerThread.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", i3, configuration, new String[0]);
                    createRemoteEnvironment.setParallelism(i2);
                    DataStreamSource parallelism = createRemoteEnvironment.addSource(new EventsGenerator(i)).setParallelism(1);
                    Properties properties = new Properties();
                    properties.setProperty(AWSConfigConstants.AWS_ACCESS_KEY_ID, str);
                    properties.setProperty(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, str2);
                    properties.setProperty("aws.region", str3);
                    FlinkKinesisProducer flinkKinesisProducer = new FlinkKinesisProducer(new SimpleStringSchema(), properties);
                    flinkKinesisProducer.setFailOnError(true);
                    flinkKinesisProducer.setDefaultStream(str4);
                    flinkKinesisProducer.setDefaultPartition("0");
                    parallelism.addSink(flinkKinesisProducer);
                    KinesisEventsGeneratorProducerThread.LOG.info("Starting producing topology");
                    createRemoteEnvironment.execute("Producing topology");
                    KinesisEventsGeneratorProducerThread.LOG.info("Producing topo finished");
                } catch (Exception e) {
                    KinesisEventsGeneratorProducerThread.LOG.warn("Error while running producing topology", e);
                    atomicReference.set(e);
                }
            }
        });
    }
}
