package org.apache.flink.test.recovery;

import akka.actor.ActorSystem;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.TaskManagerProcess;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

@Ignore
/* loaded from: input_file:org/apache/flink/test/recovery/ChaosMonkeyITCase.class */
public class ChaosMonkeyITCase {
    private static final Logger LOG = LoggerFactory.getLogger(ChaosMonkeyITCase.class);
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final File FileStateBackendBasePath;
    private static final File CheckpointCompletedCoordination;
    private static final File ProceedCoordination;
    private static final String COMPLETED_PREFIX = "completed_";
    private static final long LastElement = -1;
    private final Random rand = new Random();
    private int jobManagerPid;
    private int taskManagerPid;

    /* loaded from: input_file:org/apache/flink/test/recovery/ChaosMonkeyITCase$CheckpointedSequenceSource.class */
    public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long>, CheckpointNotifier {
        private static final long serialVersionUID = 0;
        private final long end;
        private final String completedCheckpointMarkerFilePath;
        private final File proceedFile;
        private long current = 0;
        private volatile boolean isRunning = true;

        public CheckpointedSequenceSource(long j, String str, String str2) {
            Preconditions.checkArgument(j >= 0, "Negative final count");
            this.end = j;
            this.completedCheckpointMarkerFilePath = str;
            this.proceedFile = new File(str2);
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            while (this.isRunning) {
                if (!this.proceedFile.exists()) {
                    Thread.sleep(50L);
                }
                synchronized (sourceContext.getCheckpointLock()) {
                    if (this.current > this.end) {
                        sourceContext.collect(Long.valueOf(ChaosMonkeyITCase.LastElement));
                        return;
                    } else {
                        long j = this.current;
                        this.current = j + 1;
                        sourceContext.collect(Long.valueOf(j));
                    }
                }
            }
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m620snapshotState(long j, long j2) throws Exception {
            ChaosMonkeyITCase.LOG.info("Snapshotting state {} @ ID {}.", Long.valueOf(this.current), Long.valueOf(j));
            return Long.valueOf(this.current);
        }

        public void restoreState(Long l) {
            ChaosMonkeyITCase.LOG.info("Restoring state {}/{}", l, Long.valueOf(this.end));
            this.current = l.longValue();
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            ChaosMonkeyITCase.LOG.info("Checkpoint {} completed.", Long.valueOf(j));
            TestJvmProcess.touchFile(new File(this.completedCheckpointMarkerFilePath + getRuntimeContext().getIndexOfThisSubtask()));
        }
    }

    /* loaded from: input_file:org/apache/flink/test/recovery/ChaosMonkeyITCase$CountingSink.class */
    public static class CountingSink extends RichSinkFunction<Long> implements Checkpointed<CountingSink>, CheckpointNotifier {
        private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class);
        private static final long serialVersionUID = 0;
        private final int parallelism;
        private final long expectedFinalCount;
        private long current;
        private int numberOfReceivedLastElements;

        public CountingSink(int i, long j) {
            this.expectedFinalCount = j;
            this.parallelism = i;
        }

        public void invoke(Long l) throws Exception {
            if (l.longValue() != ChaosMonkeyITCase.LastElement) {
                this.current += l.longValue();
                return;
            }
            this.numberOfReceivedLastElements++;
            if (this.numberOfReceivedLastElements != this.parallelism) {
                if (this.numberOfReceivedLastElements > this.parallelism) {
                    throw new IllegalStateException("Received more elements than parallelism.");
                }
            } else {
                if (this.current != this.expectedFinalCount) {
                    throw new Exception("Unexpected final result " + this.current);
                }
                LOG.info("Final result " + this.current);
            }
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public CountingSink m622snapshotState(long j, long j2) throws Exception {
            LOG.info("Snapshotting state {}:{} @ ID {}.", new Object[]{Long.valueOf(this.current), Integer.valueOf(this.numberOfReceivedLastElements), Long.valueOf(j)});
            return this;
        }

        public void restoreState(CountingSink countingSink) {
            LOG.info("Restoring state {}:{}", Long.valueOf(countingSink.current), Integer.valueOf(countingSink.numberOfReceivedLastElements));
            this.current = countingSink.current;
            this.numberOfReceivedLastElements = countingSink.numberOfReceivedLastElements;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            LOG.info("Checkpoint {} completed.", Long.valueOf(j));
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZooKeeper != null) {
            ZooKeeper.shutdown();
        }
        if (FileStateBackendBasePath != null) {
            FileUtils.deleteDirectory(FileStateBackendBasePath);
        }
    }

    @Test
    public void testChaosMonkey() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(10L, TimeUnit.MINUTES);
        FiniteDuration finiteDuration2 = new FiniteDuration(30L, TimeUnit.SECONDS);
        Configuration createZooKeeperRecoveryModeConfig = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper.getConnectString(), FileStateBackendBasePath.getPath());
        createZooKeeperRecoveryModeConfig.setString("akka.watch.heartbeat.interval", "1000 ms");
        createZooKeeperRecoveryModeConfig.setString("akka.watch.heartbeat.pause", "6 s");
        createZooKeeperRecoveryModeConfig.setInteger("akka.watch.threshold", 9);
        if (2000 >= finiteDuration2.toMillis()) {
            throw new IllegalArgumentException("Relax! You want to kill processes every " + finiteDuration2 + ", but the checkpointing interval is 2 seconds. Either decrease the interval or increase the kill interval. Otherwise, the program will not complete any checkpoint.");
        }
        createZooKeeperRecoveryModeConfig.setInteger("taskmanager.numberOfTaskSlots", 2);
        ActorSystem actorSystem = null;
        LeaderRetrievalService leaderRetrievalService = null;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 3; i++) {
            try {
                try {
                    arrayList.add(createAndStartJobManagerProcess(createZooKeeperRecoveryModeConfig));
                } catch (Throwable th) {
                    System.out.println("#################################################");
                    System.out.println(" TASK MANAGERS");
                    System.out.println("#################################################");
                    Iterator it = arrayList2.iterator();
                    while (it.hasNext()) {
                        ((TaskManagerProcess) it.next()).printProcessLog();
                    }
                    System.out.println("#################################################");
                    System.out.println(" JOB MANAGERS");
                    System.out.println("#################################################");
                    Iterator<JobManagerProcess> it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        it2.next().printProcessLog();
                    }
                    th.printStackTrace();
                    for (JobManagerProcess jobManagerProcess : arrayList) {
                        if (jobManagerProcess != null) {
                            jobManagerProcess.destroy();
                        }
                    }
                    if (0 != 0) {
                        leaderRetrievalService.stop();
                    }
                    if (0 != 0) {
                        actorSystem.shutdown();
                        return;
                    }
                    return;
                }
            } catch (Throwable th2) {
                for (JobManagerProcess jobManagerProcess2 : arrayList) {
                    if (jobManagerProcess2 != null) {
                        jobManagerProcess2.destroy();
                    }
                }
                if (0 != 0) {
                    leaderRetrievalService.stop();
                }
                if (0 != 0) {
                    actorSystem.shutdown();
                }
                throw th2;
            }
        }
        for (int i2 = 0; i2 < 3; i2++) {
            arrayList2.add(createAndStartTaskManagerProcess(createZooKeeperRecoveryModeConfig));
        }
        ActorSystem createDefaultActorSystem = AkkaUtils.createDefaultActorSystem();
        ZooKeeperLeaderRetrievalService createLeaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(createZooKeeperRecoveryModeConfig);
        TestingListener testingListener = new TestingListener();
        createLeaderRetrievalService.start(testingListener);
        Deadline fromNow = finiteDuration.fromNow();
        int waitForNewLeader = waitForNewLeader(testingListener, arrayList, fromNow.timeLeft());
        waitForTaskManagers(3, arrayList.get(waitForNewLeader), createDefaultActorSystem, fromNow.timeLeft());
        JobGraph createJobGraph = createJobGraph(5000, CheckpointCompletedCoordination.getPath(), ProceedCoordination.getPath(), 6, 2000);
        LOG.info("Submitting job {}", createJobGraph.getJobID());
        submitJobGraph(createJobGraph, arrayList.get(waitForNewLeader), testingListener, createDefaultActorSystem, fromNow.timeLeft());
        LOG.info("Waiting for a checkpoint to complete before kicking off chaos");
        TestJvmProcess.waitForMarkerFiles(FileStateBackendBasePath, COMPLETED_PREFIX, 6, fromNow.timeLeft().toMillis());
        LOG.info("Checkpoint completed... ready for chaos");
        int i3 = 1;
        int i4 = 0;
        int i5 = 0;
        for (int i6 = 0; i6 < 5; i6++) {
            int i7 = i3;
            i3++;
            LOG.info("Waiting for {} before next kill ({}/{})", new Object[]{finiteDuration2, Integer.valueOf(i7), 5});
            Thread.sleep(finiteDuration2.toMillis());
            LOG.info("Checking job status...");
            JobStatus requestJobStatus = requestJobStatus(createJobGraph.getJobID(), arrayList.get(waitForNewLeader), createDefaultActorSystem, fromNow.timeLeft());
            if (requestJobStatus == JobStatus.RUNNING || requestJobStatus == JobStatus.FINISHED) {
                if (requestJobStatus == JobStatus.FINISHED) {
                    LOG.info("Job finished");
                    for (JobManagerProcess jobManagerProcess3 : arrayList) {
                        if (jobManagerProcess3 != null) {
                            jobManagerProcess3.destroy();
                        }
                    }
                    if (createLeaderRetrievalService != null) {
                        createLeaderRetrievalService.stop();
                    }
                    if (createDefaultActorSystem != null) {
                        createDefaultActorSystem.shutdown();
                        return;
                    }
                    return;
                }
                LOG.info("Job status is {}", requestJobStatus);
            } else {
                LOG.info("Waiting for job status {}", JobStatus.RUNNING);
                waitForJobRunning(createJobGraph.getJobID(), arrayList.get(waitForNewLeader), createDefaultActorSystem, fromNow.timeLeft());
            }
            if (this.rand.nextBoolean()) {
                LOG.info("Killing the leading JobManager");
                JobManagerProcess createAndStartJobManagerProcess = createAndStartJobManagerProcess(createZooKeeperRecoveryModeConfig);
                JobManagerProcess remove = arrayList.remove(waitForNewLeader);
                remove.destroy();
                i4++;
                LOG.info("Killed {}", remove);
                arrayList.add(createAndStartJobManagerProcess);
                waitForNewLeader = waitForNewLeader(testingListener, arrayList, fromNow.timeLeft());
                waitForTaskManagers(3, arrayList.get(waitForNewLeader), createDefaultActorSystem, fromNow.timeLeft());
            } else {
                LOG.info("Killing a random TaskManager");
                TaskManagerProcess createAndStartTaskManagerProcess = createAndStartTaskManagerProcess(createZooKeeperRecoveryModeConfig);
                waitForTaskManagers(4, arrayList.get(waitForNewLeader), createDefaultActorSystem, fromNow.timeLeft());
                TaskManagerProcess taskManagerProcess = (TaskManagerProcess) arrayList2.remove(this.rand.nextInt(3));
                LOG.info("{} has been chosen. Killing process...", taskManagerProcess);
                taskManagerProcess.destroy();
                i5++;
                arrayList2.add(createAndStartTaskManagerProcess);
            }
        }
        LOG.info("Chaos is over. Total kills: {} ({} job manager + {} task managers). Checking job status...", new Object[]{5, Integer.valueOf(i4), Integer.valueOf(i5)});
        TestJvmProcess.touchFile(ProceedCoordination);
        LOG.info("Waiting for job status {}", JobStatus.FINISHED);
        waitForJobFinished(createJobGraph.getJobID(), arrayList.get(waitForNewLeader), createDefaultActorSystem, fromNow.timeLeft());
        LOG.info("Job finished");
        LOG.info("Waiting for job removal");
        waitForJobRemoved(createJobGraph.getJobID(), arrayList.get(waitForNewLeader), createDefaultActorSystem, fromNow.timeLeft());
        LOG.info("Job removed");
        LOG.info("Checking clean recovery state...");
        checkCleanRecoveryState(createZooKeeperRecoveryModeConfig);
        LOG.info("Recovery state clean");
        for (JobManagerProcess jobManagerProcess4 : arrayList) {
            if (jobManagerProcess4 != null) {
                jobManagerProcess4.destroy();
            }
        }
        if (createLeaderRetrievalService != null) {
            createLeaderRetrievalService.stop();
        }
        if (createDefaultActorSystem != null) {
            createDefaultActorSystem.shutdown();
        }
    }

    private JobGraph createJobGraph(int i, String str, String str2, int i2, int i3) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(i2);
        executionEnvironment.enableCheckpointing(i3);
        executionEnvironment.addSource(new CheckpointedSequenceSource(i, str, str2)).addSink(new CountingSink(i2, ((i2 * i) * (i + 1)) / 2)).setParallelism(1);
        return executionEnvironment.getStreamGraph().getJobGraph();
    }

    private void submitJobGraph(JobGraph jobGraph, JobManagerProcess jobManagerProcess, TestingListener testingListener, ActorSystem actorSystem, FiniteDuration finiteDuration) throws Exception {
        new AkkaActorGateway(jobManagerProcess.getActorRef(actorSystem, finiteDuration), testingListener.getLeaderSessionID()).tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
    }

    private void checkCleanRecoveryState(Configuration configuration) throws Exception {
        LOG.info("Checking " + ZooKeeper.getClientNamespace() + "/jobgraphs");
        Assert.assertEquals("Unclean job graphs: " + ZooKeeper.getChildren("/jobgraphs"), 0L, r0.size());
        LOG.info("Checking " + ZooKeeper.getClientNamespace() + "/checkpoints");
        Assert.assertEquals("Unclean checkpoints: " + ZooKeeper.getChildren("/checkpoints"), 0L, r0.size());
        LOG.info("Checking " + ZooKeeper.getClientNamespace() + "/checkpoint-counter");
        Assert.assertEquals("Unclean checkpoint counter: " + ZooKeeper.getChildren("/checkpoint-counter"), 0L, r0.size());
        LOG.info("ZooKeeper state is clean");
        LOG.info("Checking file system backend state...");
        File file = new File(configuration.getString("state.backend.fs.checkpointdir", ""));
        LOG.info("Checking " + file);
        if (file.listFiles() == null) {
            Assert.fail(file + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles()));
        } else {
            Assert.assertEquals("Unclean file system checkpoints: " + Arrays.toString(file.listFiles()), 0L, r0.length);
        }
        File file2 = new File(configuration.getString("recovery.zookeeper.storageDir", ""));
        LOG.info("Checking " + file2);
        if (file2.listFiles() == null) {
            Assert.fail(file2 + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles()));
        } else {
            Assert.assertEquals("Unclean file system checkpoints: " + Arrays.toString(file2.listFiles()), 0L, r0.length);
        }
    }

    private void waitForJobRemoved(JobID jobID, JobManagerProcess jobManagerProcess, ActorSystem actorSystem, FiniteDuration finiteDuration) throws Exception {
        AkkaActorGateway akkaActorGateway = new AkkaActorGateway(((JobManagerMessages.ResponseArchive) Await.result(new AkkaActorGateway(jobManagerProcess.getActorRef(actorSystem, finiteDuration), (UUID) null).ask(JobManagerMessages.getRequestArchive(), finiteDuration), finiteDuration)).actor(), (UUID) null);
        Deadline fromNow = finiteDuration.fromNow();
        while (fromNow.hasTimeLeft() && (JobManagerActorTestUtils.requestJobStatus(jobID, akkaActorGateway, fromNow.timeLeft()) instanceof JobManagerMessages.JobNotFound)) {
            Thread.sleep(100L);
        }
    }

    private JobStatus requestJobStatus(JobID jobID, JobManagerProcess jobManagerProcess, ActorSystem actorSystem, FiniteDuration finiteDuration) throws Exception {
        JobManagerMessages.CurrentJobStatus requestJobStatus = JobManagerActorTestUtils.requestJobStatus(jobID, new AkkaActorGateway(jobManagerProcess.getActorRef(actorSystem, finiteDuration), (UUID) null), finiteDuration);
        if (requestJobStatus instanceof JobManagerMessages.CurrentJobStatus) {
            return requestJobStatus.status();
        }
        if (requestJobStatus instanceof JobManagerMessages.JobNotFound) {
            return JobStatus.RESTARTING;
        }
        throw new IllegalStateException("Unexpected response from JobManager");
    }

    private void waitForJobRunning(JobID jobID, JobManagerProcess jobManagerProcess, ActorSystem actorSystem, FiniteDuration finiteDuration) throws Exception {
        JobManagerActorTestUtils.waitForJobStatus(jobID, JobStatus.RUNNING, new AkkaActorGateway(jobManagerProcess.getActorRef(actorSystem, finiteDuration), (UUID) null), finiteDuration);
    }

    private void waitForJobFinished(JobID jobID, JobManagerProcess jobManagerProcess, ActorSystem actorSystem, FiniteDuration finiteDuration) throws Exception {
        JobManagerActorTestUtils.waitForJobStatus(jobID, JobStatus.FINISHED, new AkkaActorGateway(jobManagerProcess.getActorRef(actorSystem, finiteDuration), (UUID) null), finiteDuration);
    }

    private void waitForTaskManagers(int i, JobManagerProcess jobManagerProcess, ActorSystem actorSystem, FiniteDuration finiteDuration) throws Exception {
        LOG.info("Waiting for {} task managers to connect to leading {}", Integer.valueOf(i), jobManagerProcess);
        JobManagerActorTestUtils.waitForTaskManagers(i, new AkkaActorGateway(jobManagerProcess.getActorRef(actorSystem, finiteDuration), (UUID) null), finiteDuration);
        LOG.info("All task managers connected");
    }

    private int waitForNewLeader(TestingListener testingListener, List<JobManagerProcess> list, FiniteDuration finiteDuration) throws Exception {
        LOG.info("Waiting for new leader notification");
        testingListener.waitForNewLeader(finiteDuration.toMillis());
        LOG.info("Leader: {}:{}", testingListener.getAddress(), testingListener.getLeaderSessionID());
        String address = testingListener.getAddress();
        int i = -1;
        int i2 = 0;
        while (true) {
            if (i2 >= list.size()) {
                break;
            }
            if (list.get(i2).getJobManagerAkkaURL().equals(address)) {
                i = i2;
                break;
            }
            i2++;
        }
        if (i == -1) {
            throw new IllegalStateException("Failed to determine which process is leader");
        }
        return i;
    }

    private JobManagerProcess createAndStartJobManagerProcess(Configuration configuration) throws Exception {
        int i = this.jobManagerPid;
        this.jobManagerPid = i + 1;
        JobManagerProcess jobManagerProcess = new JobManagerProcess(i, configuration);
        jobManagerProcess.createAndStart();
        LOG.info("Created and started {}.", jobManagerProcess);
        return jobManagerProcess;
    }

    private TaskManagerProcess createAndStartTaskManagerProcess(Configuration configuration) throws Exception {
        int i = this.taskManagerPid;
        this.taskManagerPid = i + 1;
        TaskManagerProcess taskManagerProcess = new TaskManagerProcess(i, configuration);
        taskManagerProcess.createAndStart();
        LOG.info("Created and started {}.", taskManagerProcess);
        return taskManagerProcess;
    }

    static {
        try {
            FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
            CheckpointCompletedCoordination = new File(FileStateBackendBasePath, COMPLETED_PREFIX);
            ProceedCoordination = new File(FileStateBackendBasePath, "proceed");
        } catch (IOException e) {
            throw new RuntimeException("Error in test setup. Could not create directory.", e);
        }
    }
}
