package org.apache.flink.test.scheduling;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.util.InstantiationUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase.class */
public class SpeculativeExecutionITCase {

    @TempDir
    private Path temporaryFolder;
    private static final int MAX_PARALLELISM = 4;
    private static final int NUMBERS_TO_PRODUCE = 10000;
    private static final int FAILURE_COUNT = 20;
    private static final AtomicInteger slowTaskCounter = new AtomicInteger(1);
    private int parallelism;
    private static ConcurrentMap<Integer, Map<Long, Long>> numberCountResults;
    private Map<Long, Long> expectedResult;

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$DummyCommitter.class */
    private static class DummyCommitter implements Committer<Tuple3<Integer, Integer, Map<Long, Long>>> {
        private static AtomicBoolean blocked = new AtomicBoolean(false);
        private static AtomicInteger attempts = new AtomicInteger(0);
        private static volatile boolean foundSpeculativeWriter;

        public DummyCommitter() {
            attempts.incrementAndGet();
        }

        public void commit(Collection<Committer.CommitRequest<Tuple3<Integer, Integer, Map<Long, Long>>>> collection) throws InterruptedException {
            Iterator<Committer.CommitRequest<Tuple3<Integer, Integer, Map<Long, Long>>>> it = collection.iterator();
            while (it.hasNext()) {
                Tuple3 tuple3 = (Tuple3) it.next().getCommittable();
                SpeculativeExecutionITCase.numberCountResults.put(tuple3.f0, tuple3.f2);
                if (((Integer) tuple3.f1).intValue() > 0) {
                    foundSpeculativeWriter = true;
                }
            }
            if (blocked.getAndSet(true)) {
                return;
            }
            Thread.sleep(5000L);
        }

        public void close() throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$DummyCommittingSinkWriter.class */
    public static class DummyCommittingSinkWriter implements CommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long, Long>>> {
        private final int subTaskIndex;
        private final int attemptNumber;
        private final Map<Long, Long> numberCountResult = new HashMap();

        public DummyCommittingSinkWriter(int i, int i2) {
            this.subTaskIndex = i;
            this.attemptNumber = i2;
        }

        public void write(Long l, SinkWriter.Context context) throws IOException, InterruptedException {
            this.numberCountResult.merge(l, 1L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
            SpeculativeExecutionITCase.maybeSleep();
        }

        public void flush(boolean z) {
        }

        public Collection<Tuple3<Integer, Integer, Map<Long, Long>>> prepareCommit() {
            return Collections.singleton(Tuple3.of(Integer.valueOf(this.subTaskIndex), Integer.valueOf(this.attemptNumber), this.numberCountResult));
        }

        public void close() throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Deprecated
    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$DummyPrecommittingSinkWriter.class */
    public static class DummyPrecommittingSinkWriter implements TwoPhaseCommittingSink.PrecommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long, Long>>> {
        private final int subTaskIndex;
        private final int attemptNumber;
        private final Map<Long, Long> numberCountResult = new HashMap();

        public DummyPrecommittingSinkWriter(int i, int i2) {
            this.subTaskIndex = i;
            this.attemptNumber = i2;
        }

        public void write(Long l, SinkWriter.Context context) throws IOException, InterruptedException {
            this.numberCountResult.merge(l, 1L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
            SpeculativeExecutionITCase.maybeSleep();
        }

        public void flush(boolean z) {
        }

        public Collection<Tuple3<Integer, Integer, Map<Long, Long>>> prepareCommit() {
            return Collections.singleton(Tuple3.of(Integer.valueOf(this.subTaskIndex), Integer.valueOf(this.attemptNumber), this.numberCountResult));
        }

        public void close() throws Exception {
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$DummySpeculativeOutputFormat.class */
    private static class DummySpeculativeOutputFormat implements OutputFormat<Long>, FinalizeOnMaster, SupportsConcurrentExecutionAttempts {
        private static final long serialVersionUID = 1;
        private static volatile boolean foundSpeculativeAttempt;
        private int taskNumber;
        private boolean taskFailed;
        private final Map<Long, Long> numberCountResult;

        private DummySpeculativeOutputFormat() {
            this.numberCountResult = new HashMap();
        }

        public void configure(Configuration configuration) {
        }

        public void open(OutputFormat.InitializationContext initializationContext) throws IOException {
            this.taskNumber = initializationContext.getTaskNumber();
        }

        public void writeRecord(Long l) throws IOException {
            try {
                this.numberCountResult.merge(l, 1L, (v0, v1) -> {
                    return Long.sum(v0, v1);
                });
                if (this.taskNumber == 0) {
                    SpeculativeExecutionITCase.maybeSleep();
                }
            } catch (Throwable th) {
                this.taskFailed = true;
            }
        }

        public void close() throws IOException {
            if (this.taskFailed) {
                return;
            }
            SpeculativeExecutionITCase.numberCountResults.put(Integer.valueOf(this.taskNumber), this.numberCountResult);
        }

        public void finalizeGlobal(FinalizeOnMaster.FinalizationContext finalizationContext) throws IOException {
            for (int i = 0; i < finalizationContext.getParallelism(); i++) {
                if (finalizationContext.getFinishedAttempt(i) != 0) {
                    foundSpeculativeAttempt = true;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$NonSpeculativeSinkFunction.class */
    private static class NonSpeculativeSinkFunction extends RichSinkFunction<Long> {
        private final Map<Long, Long> numberCountResult;

        private NonSpeculativeSinkFunction() {
            this.numberCountResult = new HashMap();
        }

        public void invoke(Long l, SinkFunction.Context context) throws Exception {
            if (SpeculativeExecutionITCase.slowTaskCounter.getAndDecrement() > 0) {
                Thread.sleep(5000L);
            }
            this.numberCountResult.merge(l, 1L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
        }

        public void finish() {
            if (getRuntimeContext().getTaskInfo().getAttemptNumber() == 0) {
                SpeculativeExecutionITCase.numberCountResults.put(Integer.valueOf(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), this.numberCountResult);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$NumberCounterMap.class */
    private static class NumberCounterMap extends RichMapFunction<Long, Long> {
        private static final AtomicInteger toFailCounter = new AtomicInteger(0);

        private NumberCounterMap() {
        }

        public Long map(Long l) throws Exception {
            if (toFailCounter.decrementAndGet() >= 0) {
                throw new Exception("Forced failure for testing");
            }
            SpeculativeExecutionITCase.maybeSleep();
            return l;
        }

        private static void reset() {
            toFailCounter.set(0);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$NumberCounterSink.class */
    public static class NumberCounterSink extends RichSinkFunction<Long> {
        private final Map<Long, Long> numberCountResult;

        private NumberCounterSink() {
            this.numberCountResult = new HashMap();
        }

        public void invoke(Long l, SinkFunction.Context context) throws Exception {
            this.numberCountResult.merge(l, 1L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
        }

        public void finish() {
            SpeculativeExecutionITCase.numberCountResults.put(Integer.valueOf(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), this.numberCountResult);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$SpeculativeSink.class */
    private static class SpeculativeSink implements Sink<Long>, SupportsCommitter<Tuple3<Integer, Integer, Map<Long, Long>>>, SupportsConcurrentExecutionAttempts {
        private SpeculativeSink() {
        }

        public SinkWriter<Long> createWriter(Sink.InitContext initContext) {
            throw new UnsupportedOperationException("Not supported");
        }

        /* renamed from: createWriter, reason: merged with bridge method [inline-methods] */
        public CommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long, Long>>> m945createWriter(WriterInitContext writerInitContext) {
            return new DummyCommittingSinkWriter(writerInitContext.getTaskInfo().getIndexOfThisSubtask(), writerInitContext.getTaskInfo().getAttemptNumber());
        }

        public Committer<Tuple3<Integer, Integer, Map<Long, Long>>> createCommitter(CommitterInitContext committerInitContext) {
            return new DummyCommitter();
        }

        public SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long, Long>>> getCommittableSerializer() {
            return new SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long, Long>>>() { // from class: org.apache.flink.test.scheduling.SpeculativeExecutionITCase.SpeculativeSink.1
                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(Tuple3<Integer, Integer, Map<Long, Long>> tuple3) throws IOException {
                    return InstantiationUtil.serializeObject(tuple3);
                }

                /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                public Tuple3<Integer, Integer, Map<Long, Long>> m946deserialize(int i, byte[] bArr) throws IOException {
                    try {
                        return (Tuple3) InstantiationUtil.deserializeObject(bArr, Thread.currentThread().getContextClassLoader());
                    } catch (ClassNotFoundException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }
    }

    @Deprecated
    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$SpeculativeSinkDeprecated.class */
    private static class SpeculativeSinkDeprecated implements TwoPhaseCommittingSink<Long, Tuple3<Integer, Integer, Map<Long, Long>>>, SupportsConcurrentExecutionAttempts {
        private SpeculativeSinkDeprecated() {
        }

        /* renamed from: createWriter, reason: merged with bridge method [inline-methods] */
        public TwoPhaseCommittingSink.PrecommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long, Long>>> m947createWriter(Sink.InitContext initContext) {
            return new DummyPrecommittingSinkWriter(initContext.getTaskInfo().getIndexOfThisSubtask(), initContext.getTaskInfo().getAttemptNumber());
        }

        public Committer<Tuple3<Integer, Integer, Map<Long, Long>>> createCommitter(CommitterInitContext committerInitContext) {
            return new DummyCommitter();
        }

        public SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long, Long>>> getCommittableSerializer() {
            return new SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long, Long>>>() { // from class: org.apache.flink.test.scheduling.SpeculativeExecutionITCase.SpeculativeSinkDeprecated.1
                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(Tuple3<Integer, Integer, Map<Long, Long>> tuple3) throws IOException {
                    return InstantiationUtil.serializeObject(tuple3);
                }

                /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
                public Tuple3<Integer, Integer, Map<Long, Long>> m948deserialize(int i, byte[] bArr) throws IOException {
                    try {
                        return (Tuple3) InstantiationUtil.deserializeObject(bArr, Thread.currentThread().getContextClassLoader());
                    } catch (ClassNotFoundException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$SpeculativeSinkFunction.class */
    private static class SpeculativeSinkFunction extends RichSinkFunction<Long> implements SupportsConcurrentExecutionAttempts {
        private final Map<Long, Long> numberCountResult;

        private SpeculativeSinkFunction() {
            this.numberCountResult = new HashMap();
        }

        public void invoke(Long l, SinkFunction.Context context) throws Exception {
            this.numberCountResult.merge(l, 1L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
            SpeculativeExecutionITCase.maybeSleep();
        }

        public void finish() {
            SpeculativeExecutionITCase.numberCountResults.put(Integer.valueOf(getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), this.numberCountResult);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$TestingInputFormat.class */
    private static class TestingInputFormat extends GenericInputFormat<Long> {
        private static final int NUM_SPLITS = 100;
        private long nextNumberToEmit;
        private long maxNumberToEmit;
        private boolean end;

        private TestingInputFormat() {
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public GenericInputSplit[] m949createInputSplits(int i) {
            GenericInputSplit[] genericInputSplitArr = new GenericInputSplit[100];
            for (int i2 = 0; i2 < 100; i2++) {
                genericInputSplitArr[i2] = new GenericInputSplit(i2, 100);
            }
            return genericInputSplitArr;
        }

        public boolean reachedEnd() {
            return this.end;
        }

        /*  JADX ERROR: Failed to decode insn: 0x0014: MOVE_MULTI, method: org.apache.flink.test.scheduling.SpeculativeExecutionITCase.TestingInputFormat.nextRecord(java.lang.Long):java.lang.Long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        public java.lang.Long nextRecord(java.lang.Long r9) {
            /*
                r8 = this;
                org.apache.flink.test.scheduling.SpeculativeExecutionITCase.access$1500()
                r0 = r8
                long r0 = r0.nextNumberToEmit
                r1 = r8
                long r1 = r1.maxNumberToEmit
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 > 0) goto L1e
                r0 = r8
                r1 = r0
                long r1 = r1.nextNumberToEmit
                // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                r2 = 1
                long r1 = r1 + r2
                r0.nextNumberToEmit = r1
                java.lang.Long.valueOf(r-1)
                return r-1
                r0 = r8
                r1 = 1
                r0.end = r1
                r0 = 0
                return r0
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.test.scheduling.SpeculativeExecutionITCase.TestingInputFormat.nextRecord(java.lang.Long):java.lang.Long");
        }

        public void open(GenericInputSplit genericInputSplit) throws IOException {
            super.open(genericInputSplit);
            this.nextNumberToEmit = (this.partitionNumber * SpeculativeExecutionITCase.NUMBERS_TO_PRODUCE) / 100;
            this.maxNumberToEmit = (((this.partitionNumber + 1) * SpeculativeExecutionITCase.NUMBERS_TO_PRODUCE) / 100) - 1;
            this.end = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$TestingIteratorSourceReader.class */
    private static class TestingIteratorSourceReader<E, IT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IT>> extends IteratorSourceReader<E, IT, SplitT> {
        private TestingIteratorSourceReader(SourceReaderContext sourceReaderContext) {
            super(sourceReaderContext);
        }

        public InputStatus pollNext(ReaderOutput<E> readerOutput) {
            SpeculativeExecutionITCase.maybeSleep();
            return super.pollNext(readerOutput);
        }

        public void close() throws Exception {
            if (TestingNumberSequenceSource.forceFailureCounter.get() > 0) {
                TestingNumberSequenceSource.forceFailureCounter.decrementAndGet();
                throw new RuntimeException("Forced failure for testing");
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$TestingNumberSequenceSource.class */
    public static class TestingNumberSequenceSource extends NumberSequenceSource {
        private final boolean forceFailureFlag;
        public static AtomicInteger forceFailureCounter = new AtomicInteger(0);

        private TestingNumberSequenceSource(boolean z) {
            super(0L, 9999L);
            this.forceFailureFlag = z;
            if (z) {
                forceFailureCounter = new AtomicInteger(1);
            }
        }

        public SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader(SourceReaderContext sourceReaderContext) {
            return new TestingIteratorSourceReader(sourceReaderContext);
        }

        public SplitEnumerator<NumberSequenceSource.NumberSequenceSplit, Collection<NumberSequenceSource.NumberSequenceSplit>> createEnumerator(SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> splitEnumeratorContext) {
            int currentParallelism = splitEnumeratorContext.currentParallelism();
            if (this.forceFailureFlag) {
                currentParallelism = 1;
            }
            return new IteratorSourceEnumerator(splitEnumeratorContext, splitNumberRange(0L, 9999L, currentParallelism));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeExecutionITCase$TestingSourceFunc.class */
    private static class TestingSourceFunc extends RichParallelSourceFunction<Long> {
        private TestingSourceFunc() {
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            SpeculativeExecutionITCase.maybeSleep();
            int indexOfThisSubtask = getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            int numberOfParallelSubtasks = getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
            long j = (indexOfThisSubtask * SpeculativeExecutionITCase.NUMBERS_TO_PRODUCE) / numberOfParallelSubtasks;
            long j2 = ((indexOfThisSubtask + 1) * SpeculativeExecutionITCase.NUMBERS_TO_PRODUCE) / numberOfParallelSubtasks;
            long j3 = j;
            while (true) {
                long j4 = j3;
                if (j4 >= j2) {
                    return;
                }
                sourceContext.collect(Long.valueOf(j4));
                j3 = j4 + 1;
            }
        }

        public void cancel() {
        }
    }

    SpeculativeExecutionITCase() {
    }

    @BeforeEach
    void setUp() {
        this.parallelism = MAX_PARALLELISM;
        slowTaskCounter.set(1);
        this.expectedResult = (Map) LongStream.range(0L, 10000L).boxed().collect(Collectors.toMap(Function.identity(), l -> {
            return 1L;
        }));
        NumberCounterMap.toFailCounter.set(0);
        numberCountResults = new ConcurrentHashMap();
    }

    @Test
    void testSpeculativeExecution() throws Exception {
        executeJob(this::setupJobWithSlowMap);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testSpeculativeExecutionWithFailover() throws Exception {
        NumberCounterMap.toFailCounter.set(FAILURE_COUNT);
        executeJob(this::setupJobWithSlowMap);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testSpeculativeExecutionWithAdaptiveParallelism() throws Exception {
        this.parallelism = -1;
        executeJob(this::setupJobWithSlowMap);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testBlockSlowNodeInSpeculativeExecution() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION, Duration.ofMinutes(1L));
        JobClient executeJobAsync = executeJobAsync(configuration, this::setupJobWithSlowMap);
        Assertions.assertThatThrownBy(() -> {
        }, "The local node is expected to be blocked but it is not.", new Object[0]).isInstanceOf(TimeoutException.class);
    }

    @Test
    void testSpeculativeExecutionOfSourceFunction() throws Exception {
        executeJob(this::setupJobWithSlowSourceFunction);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testSpeculativeExecutionOfInputFormatSource() throws Exception {
        executeJob(this::setupJobWithSlowInputFormatSource);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testSpeculativeExecutionOfNewSource() throws Exception {
        executeJob(this::setupJobWithSlowNewSource);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testSpeculativeExecutionOfNewSourceWithFailure() throws Exception {
        executeJob(streamExecutionEnvironment -> {
            setupJobWithSlowNewSource(streamExecutionEnvironment, true);
        });
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    public void testSpeculativeSlowSink() throws Exception {
        executeJob(this::setupSpeculativeSlowSink);
        waitUntilJobArchived();
        checkResults();
        Assertions.assertThat(DummyCommitter.attempts.get()).isEqualTo(this.parallelism);
        Assertions.assertThat(DummyCommitter.foundSpeculativeWriter).isTrue();
    }

    @Test
    @Deprecated
    public void testSpeculativeSlowSinkDeprecated() throws Exception {
        executeJob(this::setupSpeculativeSlowSinkDeprecated);
        waitUntilJobArchived();
        checkResults();
        Assertions.assertThat(DummyCommitter.attempts.get()).isEqualTo(this.parallelism);
        Assertions.assertThat(DummyCommitter.foundSpeculativeWriter).isTrue();
    }

    @Test
    public void testNonSpeculativeSlowSinkFunction() throws Exception {
        executeJob(this::setupNonSpeculativeSlowSinkFunction);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    public void testSpeculativeSlowSinkFunction() throws Exception {
        executeJob(this::setupSpeculativeSlowSinkFunction);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    public void testSpeculativeOutputFormatSink() throws Exception {
        executeJob(this::setupSlowOutputFormatSink);
        waitUntilJobArchived();
        checkResults();
        Assertions.assertThat(DummySpeculativeOutputFormat.foundSpeculativeAttempt).isTrue();
    }

    private void checkResults() {
        Assertions.assertThat((Map) numberCountResults.values().stream().flatMap(map -> {
            return map.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (v0, v1) -> {
            return Long.sum(v0, v1);
        }))).isEqualTo(this.expectedResult);
    }

    private void executeJob(Consumer<StreamExecutionEnvironment> consumer) throws Exception {
        executeJobAsync(new Configuration(), consumer).getJobExecutionResult().get();
    }

    private JobClient executeJobAsync(Configuration configuration, Consumer<StreamExecutionEnvironment> consumer) throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(configure(configuration));
        createLocalEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        createLocalEnvironment.setParallelism(-1);
        consumer.accept(createLocalEnvironment);
        return createLocalEnvironment.executeAsync();
    }

    private Configuration configure(Configuration configuration) {
        configuration.set(RestOptions.BIND_PORT, "0");
        configuration.set(JobManagerOptions.ARCHIVE_DIR, this.temporaryFolder.getRoot().toString());
        configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, Duration.ofMillis(5000L));
        configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, 1);
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb"));
        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, Integer.valueOf(MAX_PARALLELISM));
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, RestartStrategyOptions.RestartStrategyType.FIXED_DELAY.getMainValue());
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
        configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, true);
        if (!configuration.contains(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION)) {
            configuration.set(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION, Duration.ZERO);
        }
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, Double.valueOf(1.0d));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, Double.valueOf(0.2d));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, Duration.ofMillis(0L));
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, Integer.valueOf(MAX_PARALLELISM));
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM, 1);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, Integer.valueOf(MAX_PARALLELISM));
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK, MemorySize.parse("150kb"));
        return configuration;
    }

    private void waitUntilJobArchived() throws InterruptedException {
        while (this.temporaryFolder.getRoot().toFile().listFiles().length < 1) {
            Thread.sleep(1000L);
        }
    }

    private void setupJobWithSlowMap(StreamExecutionEnvironment streamExecutionEnvironment) {
        addSink(streamExecutionEnvironment.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup").rebalance().map(new NumberCounterMap()).setParallelism(this.parallelism).name("map").slotSharingGroup("mapGroup"));
    }

    private void setupJobWithSlowSourceFunction(StreamExecutionEnvironment streamExecutionEnvironment) {
        addSink(new DataStreamSource(streamExecutionEnvironment, BasicTypeInfo.LONG_TYPE_INFO, new StreamSource(new TestingSourceFunc()), true, "source", Boundedness.BOUNDED).setParallelism(this.parallelism).slotSharingGroup("sourceGroup"));
    }

    private void setupJobWithSlowInputFormatSource(StreamExecutionEnvironment streamExecutionEnvironment) {
        addSink(new DataStreamSource(streamExecutionEnvironment, BasicTypeInfo.LONG_TYPE_INFO, new StreamSource(new InputFormatSourceFunction(new TestingInputFormat(), TypeInformation.of(Long.class))), true, "source", Boundedness.BOUNDED).setParallelism(this.parallelism).slotSharingGroup("sourceGroup"));
    }

    private void setupJobWithSlowNewSource(StreamExecutionEnvironment streamExecutionEnvironment) {
        setupJobWithSlowNewSource(streamExecutionEnvironment, false);
    }

    private void setupJobWithSlowNewSource(StreamExecutionEnvironment streamExecutionEnvironment, boolean z) {
        addSink(streamExecutionEnvironment.fromSource(new TestingNumberSequenceSource(z), WatermarkStrategy.noWatermarks(), "source"));
    }

    private void setupSpeculativeSlowSink(StreamExecutionEnvironment streamExecutionEnvironment) {
        DummyCommitter.attempts.set(0);
        DummyCommitter.blocked.set(false);
        boolean unused = DummyCommitter.foundSpeculativeWriter = false;
        streamExecutionEnvironment.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup").sinkTo(new SpeculativeSink()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    @Deprecated
    private void setupSpeculativeSlowSinkDeprecated(StreamExecutionEnvironment streamExecutionEnvironment) {
        DummyCommitter.attempts.set(0);
        DummyCommitter.blocked.set(false);
        boolean unused = DummyCommitter.foundSpeculativeWriter = false;
        streamExecutionEnvironment.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup").sinkTo(new SpeculativeSinkDeprecated()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    private void setupNonSpeculativeSlowSinkFunction(StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup").addSink(new NonSpeculativeSinkFunction()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    private void setupSpeculativeSlowSinkFunction(StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup").addSink(new SpeculativeSinkFunction()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    private void setupSlowOutputFormatSink(StreamExecutionEnvironment streamExecutionEnvironment) {
        streamExecutionEnvironment.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("group1").rebalance().writeUsingOutputFormat(new DummySpeculativeOutputFormat()).setParallelism(this.parallelism).name("sink").slotSharingGroup("group3");
    }

    private void addSink(DataStream<Long> dataStream) {
        dataStream.rebalance().addSink(new NumberCounterSink()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void maybeSleep() {
        if (slowTaskCounter.getAndDecrement() > 0) {
            try {
                Thread.sleep(2147483647L);
            } catch (Exception e) {
                throw new RuntimeException();
            }
        }
    }

    static /* synthetic */ void access$1500() {
        maybeSleep();
    }
}
