package org.apache.flink.test.recovery;

import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/flink/test/recovery/TaskManagerProcessFailureBatchRecoveryITCase.class */
public class TaskManagerProcessFailureBatchRecoveryITCase extends AbstractTaskManagerProcessFailureRecoveryTest {
    private final ExecutionMode executionMode;

    public TaskManagerProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
        this.executionMode = executionMode;
    }

    @Parameters(name = "executionMode={0}")
    public static Collection<Object[]> executionMode() {
        return Arrays.asList(new Object[]{ExecutionMode.PIPELINED}, new Object[]{ExecutionMode.BATCH});
    }

    @Override // org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest
    public void testTaskManagerFailure(Configuration configuration, final File file) throws Exception {
        ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, configuration, new String[0]);
        createRemoteEnvironment.setParallelism(4);
        createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 5000L));
        createRemoteEnvironment.getConfig().setExecutionMode(this.executionMode);
        Assertions.assertThat(5000050000L).isEqualTo(((Long) createRemoteEnvironment.generateSequence(1L, 100000L).rebalance().map(new RichMapFunction<Long, Long>() { // from class: org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase.2
            private final File proceedFile;
            private boolean markerCreated = false;
            private boolean checkForProceedFile = true;

            {
                this.proceedFile = new File(file, "proceed");
            }

            public Long map(Long l) throws Exception {
                if (!this.markerCreated) {
                    AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(file, "ready_" + getRuntimeContext().getIndexOfThisSubtask()));
                    this.markerCreated = true;
                }
                if (this.checkForProceedFile) {
                    if (this.proceedFile.exists()) {
                        this.checkForProceedFile = false;
                    } else {
                        Thread.sleep(100L);
                    }
                }
                return l;
            }
        }).reduce(new ReduceFunction<Long>() { // from class: org.apache.flink.test.recovery.TaskManagerProcessFailureBatchRecoveryITCase.1
            public Long reduce(Long l, Long l2) {
                return Long.valueOf(l.longValue() + l2.longValue());
            }
        }).collect().get(0)).longValue());
    }
}
