package org.apache.flink.streaming.connectors.kinesis.testutils;

import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread.class */
public class ExactlyOnceValidatingConsumerThread {
    private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingConsumerThread.class);

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread$ArtificialFailOnceFlatMapper.class */
    private static class ArtificialFailOnceFlatMapper extends RichFlatMapFunction<String, String> {
        int count = 0;
        private final int failAtRecordCount;

        public ArtificialFailOnceFlatMapper(int i) {
            this.failAtRecordCount = i;
        }

        public void flatMap(String str, Collector<String> collector) throws Exception {
            int i = this.count;
            this.count = i + 1;
            if (i >= this.failAtRecordCount && getRuntimeContext().getAttemptNumber() == 0) {
                throw new RuntimeException("Artificial failure. Restart please.");
            }
            collector.collect(str);
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((String) obj, (Collector<String>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kinesis/testutils/ExactlyOnceValidatingConsumerThread$ExactlyOnceValidatingMapper.class */
    private static class ExactlyOnceValidatingMapper implements FlatMapFunction<String, String>, ListCheckpointed<BitSet> {
        private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceValidatingMapper.class);
        private final int totalEventCount;
        private BitSet validator;

        public ExactlyOnceValidatingMapper(int i) {
            this.totalEventCount = i;
            this.validator = new BitSet(i);
        }

        public void flatMap(String str, Collector<String> collector) throws Exception {
            LOG.info("Consumed {}", str);
            int parseInt = Integer.parseInt(str.split("-")[0]);
            if (this.validator.get(parseInt)) {
                throw new RuntimeException("Saw id " + parseInt + " twice!");
            }
            this.validator.set(parseInt);
            if (parseInt > this.totalEventCount - 1) {
                throw new RuntimeException("Out of bounds ID observed");
            }
            if (this.validator.nextClearBit(0) == this.totalEventCount) {
                throw new SuccessException();
            }
        }

        public List<BitSet> snapshotState(long j, long j2) throws Exception {
            return Collections.singletonList(this.validator);
        }

        public void restoreState(List<BitSet> list) throws Exception {
            if (list.size() == 1) {
                this.validator = list.get(0);
            } else {
                Preconditions.checkState(list.isEmpty());
            }
        }

        public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
            flatMap((String) obj, (Collector<String>) collector);
        }
    }

    public static Thread create(final int i, final int i2, final int i3, final int i4, final long j, final String str, final String str2, final String str3, final String str4, final AtomicReference<Throwable> atomicReference, final int i5, final Configuration configuration) {
        return new Thread(new Runnable() { // from class: org.apache.flink.streaming.connectors.kinesis.testutils.ExactlyOnceValidatingConsumerThread.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    StreamExecutionEnvironment createRemoteEnvironment = StreamExecutionEnvironment.createRemoteEnvironment("localhost", i5, configuration, new String[0]);
                    createRemoteEnvironment.setParallelism(i3);
                    createRemoteEnvironment.enableCheckpointing(i4);
                    createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, j));
                    Properties properties = new Properties();
                    properties.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, str);
                    properties.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, str2);
                    properties.setProperty("aws.region", str3);
                    properties.setProperty("flink.stream.initpos", ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
                    createRemoteEnvironment.addSource(new FlinkKinesisConsumer(str4, new SimpleStringSchema(), properties)).flatMap(new ArtificialFailOnceFlatMapper(i2)).flatMap(new ExactlyOnceValidatingMapper(i)).setParallelism(1);
                    ExactlyOnceValidatingConsumerThread.LOG.info("Starting consuming topology");
                    org.apache.flink.test.util.TestUtils.tryExecute(createRemoteEnvironment, "Consuming topo");
                    ExactlyOnceValidatingConsumerThread.LOG.info("Consuming topo finished");
                } catch (Exception e) {
                    ExactlyOnceValidatingConsumerThread.LOG.warn("Error while running consuming topology", e);
                    atomicReference.set(e);
                }
            }
        });
    }
}
