/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.io.File;
import java.util.Iterator;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={NoOpTestExtension.class})
public class TaskManagerProcessFailureBatchRecoveryITCase
extends AbstractTaskManagerProcessFailureRecoveryTest {
    @Override
    public void testTaskManagerFailure(Configuration configuration, final File coordinateDir) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)1337, (Configuration)configuration, (String[])new String[0]);
        env.setParallelism(4);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        long numElements = 100000L;
        SingleOutputStreamOperator result = env.fromSequence(1L, 100000L).rebalance().map((MapFunction)new RichMapFunction<Long, Long>(){
            private final File proceedFile;
            private boolean markerCreated;
            private boolean checkForProceedFile;
            {
                this.proceedFile = new File(coordinateDir, "proceed");
                this.markerCreated = false;
                this.checkForProceedFile = true;
            }

            public Long map(Long value) throws Exception {
                if (!this.markerCreated) {
                    int taskIndex = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                    AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateDir, "ready_" + taskIndex));
                    this.markerCreated = true;
                }
                if (this.checkForProceedFile) {
                    if (this.proceedFile.exists()) {
                        this.checkForProceedFile = false;
                    } else {
                        Thread.sleep(100L);
                    }
                }
                return value;
            }
        }).setParallelism(4).windowAll((WindowAssigner)GlobalWindows.createWithEndOfStreamTrigger()).reduce((ReduceFunction)new ReduceFunction<Long>(){

            public Long reduce(Long value1, Long value2) {
                return value1 + value2;
            }
        });
        long sum = CollectionUtil.iteratorToList((Iterator)result.executeAndCollect()).stream().mapToLong(x -> x).sum();
        Assertions.assertThat((long)5000050000L).isEqualTo(sum);
    }
}

