package org.apache.flink.test.scheduling;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeSchedulerITCase.class */
public class SpeculativeSchedulerITCase {

    @TempDir
    private Path temporaryFolder;
    private static final int MAX_PARALLELISM = 4;
    private static final int NUMBERS_TO_PRODUCE = 10000;
    private static final int FAILURE_COUNT = 20;
    private static final AtomicInteger slowTaskCounter = new AtomicInteger(1);
    private int parallelism;
    private static ConcurrentMap<Integer, Map<Long, Long>> numberCountResults;
    private Map<Long, Long> expectedResult;

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeSchedulerITCase$NumberCounterMap.class */
    private static class NumberCounterMap extends RichMapFunction<Long, Long> {
        private static final AtomicInteger toFailCounter = new AtomicInteger(0);

        private NumberCounterMap() {
        }

        public Long map(Long l) throws Exception {
            if (toFailCounter.decrementAndGet() >= 0) {
                throw new Exception("Forced failure for testing");
            }
            SpeculativeSchedulerITCase.maybeSleep();
            return l;
        }

        private static void reset() {
            toFailCounter.set(0);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeSchedulerITCase$NumberCounterSink.class */
    public static class NumberCounterSink extends RichSinkFunction<Long> {
        private final Map<Long, Long> numberCountResult;

        private NumberCounterSink() {
            this.numberCountResult = new HashMap();
        }

        public void invoke(Long l, SinkFunction.Context context) throws Exception {
            this.numberCountResult.merge(l, 1L, (v0, v1) -> {
                return Long.sum(v0, v1);
            });
        }

        public void finish() {
            SpeculativeSchedulerITCase.numberCountResults.put(Integer.valueOf(getRuntimeContext().getIndexOfThisSubtask()), this.numberCountResult);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeSchedulerITCase$TestingInputFormat.class */
    private static class TestingInputFormat extends GenericInputFormat<Long> {
        private static final int NUM_SPLITS = 100;
        private long nextNumberToEmit;
        private long maxNumberToEmit;
        private boolean end;

        private TestingInputFormat() {
        }

        /* renamed from: createInputSplits, reason: merged with bridge method [inline-methods] */
        public GenericInputSplit[] m930createInputSplits(int i) {
            GenericInputSplit[] genericInputSplitArr = new GenericInputSplit[100];
            for (int i2 = 0; i2 < 100; i2++) {
                genericInputSplitArr[i2] = new GenericInputSplit(i2, 100);
            }
            return genericInputSplitArr;
        }

        public boolean reachedEnd() {
            return this.end;
        }

        /*  JADX ERROR: Failed to decode insn: 0x0014: MOVE_MULTI, method: org.apache.flink.test.scheduling.SpeculativeSchedulerITCase.TestingInputFormat.nextRecord(java.lang.Long):java.lang.Long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        public java.lang.Long nextRecord(java.lang.Long r9) {
            /*
                r8 = this;
                org.apache.flink.test.scheduling.SpeculativeSchedulerITCase.access$600()
                r0 = r8
                long r0 = r0.nextNumberToEmit
                r1 = r8
                long r1 = r1.maxNumberToEmit
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 > 0) goto L1e
                r0 = r8
                r1 = r0
                long r1 = r1.nextNumberToEmit
                // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                r2 = 1
                long r1 = r1 + r2
                r0.nextNumberToEmit = r1
                java.lang.Long.valueOf(r-1)
                return r-1
                r0 = r8
                r1 = 1
                r0.end = r1
                r0 = 0
                return r0
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.test.scheduling.SpeculativeSchedulerITCase.TestingInputFormat.nextRecord(java.lang.Long):java.lang.Long");
        }

        public void open(GenericInputSplit genericInputSplit) throws IOException {
            super.open(genericInputSplit);
            this.nextNumberToEmit = (this.partitionNumber * SpeculativeSchedulerITCase.NUMBERS_TO_PRODUCE) / 100;
            this.maxNumberToEmit = (((this.partitionNumber + 1) * SpeculativeSchedulerITCase.NUMBERS_TO_PRODUCE) / 100) - 1;
            this.end = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeSchedulerITCase$TestingIteratorSourceReader.class */
    private static class TestingIteratorSourceReader<E, IT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IT>> extends IteratorSourceReader<E, IT, SplitT> {
        private TestingIteratorSourceReader(SourceReaderContext sourceReaderContext) {
            super(sourceReaderContext);
        }

        public InputStatus pollNext(ReaderOutput<E> readerOutput) {
            SpeculativeSchedulerITCase.maybeSleep();
            return super.pollNext(readerOutput);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeSchedulerITCase$TestingNumberSequenceSource.class */
    private static class TestingNumberSequenceSource extends NumberSequenceSource {
        private TestingNumberSequenceSource() {
            super(0L, 9999L);
        }

        public SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader(SourceReaderContext sourceReaderContext) {
            return new TestingIteratorSourceReader(sourceReaderContext);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/scheduling/SpeculativeSchedulerITCase$TestingSourceFunc.class */
    private static class TestingSourceFunc extends RichParallelSourceFunction<Long> {
        private TestingSourceFunc() {
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            SpeculativeSchedulerITCase.maybeSleep();
            int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
            int numberOfParallelSubtasks = getRuntimeContext().getNumberOfParallelSubtasks();
            long j = (indexOfThisSubtask * SpeculativeSchedulerITCase.NUMBERS_TO_PRODUCE) / numberOfParallelSubtasks;
            long j2 = ((indexOfThisSubtask + 1) * SpeculativeSchedulerITCase.NUMBERS_TO_PRODUCE) / numberOfParallelSubtasks;
            long j3 = j;
            while (true) {
                long j4 = j3;
                if (j4 >= j2) {
                    return;
                }
                sourceContext.collect(Long.valueOf(j4));
                j3 = j4 + 1;
            }
        }

        public void cancel() {
        }
    }

    SpeculativeSchedulerITCase() {
    }

    @BeforeEach
    void setUp() {
        this.parallelism = MAX_PARALLELISM;
        slowTaskCounter.set(1);
        this.expectedResult = (Map) LongStream.range(0L, 10000L).boxed().collect(Collectors.toMap(Function.identity(), l -> {
            return 1L;
        }));
        NumberCounterMap.toFailCounter.set(0);
        numberCountResults = new ConcurrentHashMap();
    }

    @Test
    void testSpeculativeExecution() throws Exception {
        executeJob(this::setupJobWithSlowMap);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testSpeculativeExecutionWithFailover() throws Exception {
        NumberCounterMap.toFailCounter.set(FAILURE_COUNT);
        executeJob(this::setupJobWithSlowMap);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testSpeculativeExecutionWithAdaptiveParallelism() throws Exception {
        this.parallelism = -1;
        executeJob(this::setupJobWithSlowMap);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testBlockSlowNodeInSpeculativeExecution() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.BLOCK_SLOW_NODE_DURATION, Duration.ofMinutes(1L));
        JobClient executeJobAsync = executeJobAsync(configuration, this::setupJobWithSlowMap);
        Assertions.assertThatThrownBy(() -> {
        }, "The local node is expected to be blocked but it is not.", new Object[0]).isInstanceOf(TimeoutException.class);
    }

    @Test
    void testSpeculativeExecutionOfSourceFunction() throws Exception {
        executeJob(this::setupJobWithSlowSourceFunction);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testSpeculativeExecutionOfInputFormatSource() throws Exception {
        executeJob(this::setupJobWithSlowInputFormatSource);
        waitUntilJobArchived();
        checkResults();
    }

    @Test
    void testSpeculativeExecutionOfNewSource() throws Exception {
        executeJob(this::setupJobWithSlowNewSource);
        waitUntilJobArchived();
        checkResults();
    }

    private void checkResults() {
        Assertions.assertThat((Map) numberCountResults.values().stream().flatMap(map -> {
            return map.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (v0, v1) -> {
            return Long.sum(v0, v1);
        }))).isEqualTo(this.expectedResult);
    }

    private void executeJob(Consumer<StreamExecutionEnvironment> consumer) throws Exception {
        executeJobAsync(new Configuration(), consumer).getJobExecutionResult().get();
    }

    private JobClient executeJobAsync(Configuration configuration, Consumer<StreamExecutionEnvironment> consumer) throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(configure(configuration));
        createLocalEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        createLocalEnvironment.setParallelism(-1);
        consumer.accept(createLocalEnvironment);
        return createLocalEnvironment.executeAsync();
    }

    private Configuration configure(Configuration configuration) {
        configuration.setString(RestOptions.BIND_PORT, "0");
        configuration.set(JobManagerOptions.ARCHIVE_DIR, this.temporaryFolder.getRoot().toString());
        configuration.setLong(JobManagerOptions.SLOT_REQUEST_TIMEOUT, 5000L);
        configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, 1);
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4kb"));
        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, Integer.valueOf(MAX_PARALLELISM));
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, "fixed-delay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, Integer.MAX_VALUE);
        configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.AdaptiveBatch);
        configuration.set(JobManagerOptions.SPECULATIVE_ENABLED, true);
        if (!configuration.contains(JobManagerOptions.BLOCK_SLOW_NODE_DURATION)) {
            configuration.set(JobManagerOptions.BLOCK_SLOW_NODE_DURATION, Duration.ZERO);
        }
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, Double.valueOf(1.0d));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, Double.valueOf(0.2d));
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, Duration.ofMillis(0L));
        configuration.set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM, Integer.valueOf(MAX_PARALLELISM));
        configuration.set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM, 1);
        configuration.set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, Integer.valueOf(MAX_PARALLELISM));
        configuration.set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK, MemorySize.parse("150kb"));
        return configuration;
    }

    private void waitUntilJobArchived() throws InterruptedException {
        while (this.temporaryFolder.getRoot().toFile().listFiles().length < 1) {
            Thread.sleep(1000L);
        }
    }

    private void setupJobWithSlowMap(StreamExecutionEnvironment streamExecutionEnvironment) {
        addSink(streamExecutionEnvironment.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup").rebalance().map(new NumberCounterMap()).setParallelism(this.parallelism).name("map").slotSharingGroup("mapGroup"));
    }

    private void setupJobWithSlowSourceFunction(StreamExecutionEnvironment streamExecutionEnvironment) {
        addSink(new DataStreamSource(streamExecutionEnvironment, BasicTypeInfo.LONG_TYPE_INFO, new StreamSource(new TestingSourceFunc()), true, "source", Boundedness.BOUNDED).setParallelism(this.parallelism).slotSharingGroup("sourceGroup"));
    }

    private void setupJobWithSlowInputFormatSource(StreamExecutionEnvironment streamExecutionEnvironment) {
        addSink(new DataStreamSource(streamExecutionEnvironment, BasicTypeInfo.LONG_TYPE_INFO, new StreamSource(new InputFormatSourceFunction(new TestingInputFormat(), TypeInformation.of(Long.class))), true, "source", Boundedness.BOUNDED).setParallelism(this.parallelism).slotSharingGroup("sourceGroup"));
    }

    private void setupJobWithSlowNewSource(StreamExecutionEnvironment streamExecutionEnvironment) {
        addSink(streamExecutionEnvironment.fromSource(new TestingNumberSequenceSource(), WatermarkStrategy.noWatermarks(), "source"));
    }

    private void addSink(DataStream<Long> dataStream) {
        dataStream.rebalance().addSink(new NumberCounterSink()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void maybeSleep() {
        if (slowTaskCounter.getAndDecrement() > 0) {
            try {
                Thread.sleep(2147483647L);
            } catch (Exception e) {
                throw new RuntimeException();
            }
        }
    }

    static /* synthetic */ void access$600() {
        maybeSleep();
    }
}
