package org.apache.flink.test.recovery;

import akka.actor.ActorSystem;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import scala.Option;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.class */
public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
    private static final File FileStateBackendBasePath;
    protected static final String READY_MARKER_FILE_PREFIX = "ready_";
    protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
    protected static final String PROCEED_MARKER_FILE = "proceed";
    protected static final int PARALLELISM = 4;
    private final ExecutionMode executionMode;

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZooKeeper != null) {
            ZooKeeper.shutdown();
        }
        if (FileStateBackendBasePath != null) {
            FileUtils.deleteDirectory(FileStateBackendBasePath);
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
        FileUtils.cleanDirectory(FileStateBackendBasePath);
    }

    public JobManagerHAProcessFailureBatchRecoveryITCase(ExecutionMode executionMode) {
        this.executionMode = executionMode;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> executionMode() {
        return Arrays.asList(new Object[]{ExecutionMode.PIPELINED}, new Object[]{ExecutionMode.BATCH});
    }

    public void testJobManagerFailure(String str, final File file) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("recovery.mode", "ZOOKEEPER");
        configuration.setString("recovery.zookeeper.quorum", str);
        ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("leader", 1, configuration, new String[0]);
        createRemoteEnvironment.setParallelism(PARALLELISM);
        createRemoteEnvironment.setNumberOfExecutionRetries(1);
        createRemoteEnvironment.getConfig().setExecutionMode(this.executionMode);
        createRemoteEnvironment.getConfig().disableSysoutLogging();
        createRemoteEnvironment.generateSequence(1L, 100000L).rebalance().map(new RichMapFunction<Long, Long>() { // from class: org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.3
            private final File proceedFile;
            private boolean markerCreated = false;
            private boolean checkForProceedFile = true;

            {
                this.proceedFile = new File(file, JobManagerHAProcessFailureBatchRecoveryITCase.PROCEED_MARKER_FILE);
            }

            public Long map(Long l) throws Exception {
                if (!this.markerCreated) {
                    AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(file, JobManagerHAProcessFailureBatchRecoveryITCase.READY_MARKER_FILE_PREFIX + getRuntimeContext().getIndexOfThisSubtask()));
                    this.markerCreated = true;
                }
                if (this.checkForProceedFile) {
                    if (this.proceedFile.exists()) {
                        this.checkForProceedFile = false;
                    } else {
                        Thread.sleep(100L);
                    }
                }
                return l;
            }
        }).reduce(new ReduceFunction<Long>() { // from class: org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.2
            public Long reduce(Long l, Long l2) {
                return Long.valueOf(l.longValue() + l2.longValue());
            }
        }).flatMap(new RichFlatMapFunction<Long, Long>() { // from class: org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.1
            public void flatMap(Long l, Collector<Long> collector) throws Exception {
                Assert.assertEquals(5000050000L, l.longValue());
                AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(file, JobManagerHAProcessFailureBatchRecoveryITCase.FINISH_MARKER_FILE_PREFIX + getRuntimeContext().getIndexOfThisSubtask()));
            }

            public /* bridge */ /* synthetic */ void flatMap(Object obj, Collector collector) throws Exception {
                flatMap((Long) obj, (Collector<Long>) collector);
            }
        }).output(new DiscardingOutputFormat());
        createRemoteEnvironment.execute();
    }

    @Test
    public void testJobManagerProcessFailure() throws Exception {
        Assert.assertEquals(4L, 4L);
        JobManagerProcess[] jobManagerProcessArr = new JobManagerProcess[2];
        ActorSystem[] actorSystemArr = new ActorSystem[2];
        LeaderRetrievalService leaderRetrievalService = null;
        final File file = null;
        try {
            try {
                Deadline fromNow = TestTimeOut.fromNow();
                file = CommonTestUtils.createTempDirectory();
                Configuration createZooKeeperRecoveryModeConfig = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
                jobManagerProcessArr[0] = new JobManagerProcess(0, createZooKeeperRecoveryModeConfig);
                jobManagerProcessArr[0].startProcess();
                createZooKeeperRecoveryModeConfig.setInteger("taskmanager.memory.size", PARALLELISM);
                createZooKeeperRecoveryModeConfig.setInteger("taskmanager.network.numberOfBuffers", 100);
                createZooKeeperRecoveryModeConfig.setInteger("taskmanager.numberOfTaskSlots", 2);
                for (int i = 0; i < 2; i++) {
                    actorSystemArr[i] = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
                    TaskManager.startTaskManagerComponentsAndActor(createZooKeeperRecoveryModeConfig, ResourceID.generate(), actorSystemArr[i], "localhost", Option.empty(), Option.empty(), false, TaskManager.class);
                }
                ActorSystem createActorSystem = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
                jobManagerProcessArr[0].getActorRef(createActorSystem, fromNow.timeLeft());
                TestingListener testingListener = new TestingListener();
                leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(createZooKeeperRecoveryModeConfig);
                leaderRetrievalService.start(testingListener);
                testingListener.waitForNewLeader(fromNow.timeLeft().toMillis());
                JobManagerActorTestUtils.waitForTaskManagers(2, new AkkaActorGateway(AkkaUtils.getActorRef(testingListener.getAddress(), createActorSystem, fromNow.timeLeft()), testingListener.getLeaderSessionID()), fromNow.timeLeft());
                final Throwable[] thArr = new Throwable[1];
                Thread thread = new Thread("Program Trigger") { // from class: org.apache.flink.test.recovery.JobManagerHAProcessFailureBatchRecoveryITCase.4
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            JobManagerHAProcessFailureBatchRecoveryITCase.this.testJobManagerFailure(JobManagerHAProcessFailureBatchRecoveryITCase.ZooKeeper.getConnectString(), file);
                        } catch (Throwable th) {
                            th.printStackTrace();
                            thArr[0] = th;
                        }
                    }
                };
                thread.start();
                AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(file, READY_MARKER_FILE_PREFIX, PARALLELISM, fromNow.timeLeft().toMillis());
                jobManagerProcessArr[0].destroy();
                jobManagerProcessArr[1] = new JobManagerProcess(1, createZooKeeperRecoveryModeConfig);
                jobManagerProcessArr[1].startProcess();
                jobManagerProcessArr[1].getActorRef(createActorSystem, fromNow.timeLeft());
                AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(file, PROCEED_MARKER_FILE));
                thread.join(fromNow.timeLeft().toMillis());
                AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(file, FINISH_MARKER_FILE_PREFIX, 1, fromNow.timeLeft().toMillis());
                Assert.assertFalse("The program did not finish in time", thread.isAlive());
                if (thArr[0] != null) {
                    Throwable th = thArr[0];
                    th.printStackTrace();
                    Assert.fail("The program encountered a " + th.getClass().getSimpleName() + " : " + th.getMessage());
                }
                for (int i2 = 0; i2 < 2; i2++) {
                    if (actorSystemArr[i2] != null) {
                        actorSystemArr[i2].shutdown();
                    }
                }
                if (leaderRetrievalService != null) {
                    leaderRetrievalService.stop();
                }
                for (JobManagerProcess jobManagerProcess : jobManagerProcessArr) {
                    if (jobManagerProcess != null) {
                        jobManagerProcess.destroy();
                    }
                }
                if (file != null) {
                    try {
                        FileUtils.deleteDirectory(file);
                    } catch (Throwable th2) {
                    }
                }
            } catch (Throwable th3) {
                th3.printStackTrace();
                for (JobManagerProcess jobManagerProcess2 : jobManagerProcessArr) {
                    if (jobManagerProcess2 != null) {
                        jobManagerProcess2.printProcessLog();
                    }
                }
                throw th3;
            }
        } catch (Throwable th4) {
            for (int i3 = 0; i3 < 2; i3++) {
                if (actorSystemArr[i3] != null) {
                    actorSystemArr[i3].shutdown();
                }
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            for (JobManagerProcess jobManagerProcess3 : jobManagerProcessArr) {
                if (jobManagerProcess3 != null) {
                    jobManagerProcess3.destroy();
                }
            }
            if (file != null) {
                try {
                    FileUtils.deleteDirectory(file);
                } catch (Throwable th5) {
                }
            }
            throw th4;
        }
    }

    static {
        try {
            FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
        } catch (IOException e) {
            throw new RuntimeException("Error in test setup. Could not create directory.", e);
        }
    }
}
