/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.TaskManagerProcess;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

@Ignore
public class ChaosMonkeyITCase {
    private static final Logger LOG = LoggerFactory.getLogger(ChaosMonkeyITCase.class);
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final File FileStateBackendBasePath;
    private static final File CheckpointCompletedCoordination;
    private static final File ProceedCoordination;
    private static final String COMPLETED_PREFIX = "completed_";
    private static final long LastElement = -1L;
    private final Random rand = new Random();
    private int jobManagerPid;
    private int taskManagerPid;

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZooKeeper != null) {
            ZooKeeper.shutdown();
        }
        if (FileStateBackendBasePath != null) {
            FileUtils.deleteDirectory((File)FileStateBackendBasePath);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testChaosMonkey() throws Exception {
        int numberOfJobManagers = 3;
        int numberOfTaskManagers = 3;
        int numberOfSlotsPerTaskManager = 2;
        int n = 5000;
        int parallelism = 6;
        FiniteDuration testDuration = new FiniteDuration(10L, TimeUnit.MINUTES);
        FiniteDuration killEvery = new FiniteDuration(30L, TimeUnit.SECONDS);
        int checkpointingIntervalMs = 2000;
        int totalNumberOfKills = 5;
        Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig((String)ZooKeeper.getConnectString(), (String)FileStateBackendBasePath.getPath());
        config.setString("akka.watch.heartbeat.interval", "1000 ms");
        config.setString("akka.watch.heartbeat.pause", "6 s");
        config.setInteger("akka.watch.threshold", 9);
        if (2000L >= killEvery.toMillis()) {
            throw new IllegalArgumentException("Relax! You want to kill processes every " + killEvery + ", but the checkpointing interval is " + 2 + " seconds. Either decrease the interval or " + "increase the kill interval. Otherwise, the program will not complete any " + "checkpoint.");
        }
        config.setInteger("taskmanager.numberOfTaskSlots", 2);
        ActorSystem testActorSystem = null;
        ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
        ArrayList<JobManagerProcess> jobManagerProcesses = new ArrayList<JobManagerProcess>();
        ArrayList<TaskManagerProcess> taskManagerProcesses = new ArrayList<TaskManagerProcess>();
        try {
            int i;
            for (i = 0; i < 3; ++i) {
                jobManagerProcesses.add(this.createAndStartJobManagerProcess(config));
            }
            for (i = 0; i < 3; ++i) {
                taskManagerProcesses.add(this.createAndStartTaskManagerProcess(config));
            }
            testActorSystem = AkkaUtils.createDefaultActorSystem();
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((Configuration)config);
            TestingListener leaderListener = new TestingListener();
            leaderRetrievalService.start((LeaderRetrievalListener)leaderListener);
            Deadline deadline = testDuration.fromNow();
            int leaderIndex = this.waitForNewLeader(leaderListener, jobManagerProcesses, deadline.timeLeft());
            this.waitForTaskManagers(3, (JobManagerProcess)jobManagerProcesses.get(leaderIndex), testActorSystem, deadline.timeLeft());
            JobGraph jobGraph = this.createJobGraph(5000, CheckpointCompletedCoordination.getPath(), ProceedCoordination.getPath(), 6, 2000);
            LOG.info("Submitting job {}", (Object)jobGraph.getJobID());
            this.submitJobGraph(jobGraph, (JobManagerProcess)jobManagerProcesses.get(leaderIndex), leaderListener, testActorSystem, deadline.timeLeft());
            LOG.info("Waiting for a checkpoint to complete before kicking off chaos");
            TestJvmProcess.waitForMarkerFiles((File)FileStateBackendBasePath, (String)COMPLETED_PREFIX, (int)6, (long)deadline.timeLeft().toMillis());
            LOG.info("Checkpoint completed... ready for chaos");
            int currentKillNumber = 1;
            int currentJobManagerKills = 0;
            int currentTaskManagerKills = 0;
            for (int i2 = 0; i2 < 5; ++i2) {
                LOG.info("Waiting for {} before next kill ({}/{})", new Object[]{killEvery, currentKillNumber++, 5});
                Thread.sleep(killEvery.toMillis());
                LOG.info("Checking job status...");
                JobStatus jobStatus = this.requestJobStatus(jobGraph.getJobID(), (JobManagerProcess)jobManagerProcesses.get(leaderIndex), testActorSystem, deadline.timeLeft());
                if (jobStatus != JobStatus.RUNNING && jobStatus != JobStatus.FINISHED) {
                    LOG.info("Waiting for job status {}", (Object)JobStatus.RUNNING);
                    this.waitForJobRunning(jobGraph.getJobID(), (JobManagerProcess)jobManagerProcesses.get(leaderIndex), testActorSystem, deadline.timeLeft());
                } else {
                    if (jobStatus == JobStatus.FINISHED) {
                        LOG.info("Job finished");
                        return;
                    }
                    LOG.info("Job status is {}", (Object)jobStatus);
                }
                if (this.rand.nextBoolean()) {
                    LOG.info("Killing the leading JobManager");
                    JobManagerProcess newJobManager = this.createAndStartJobManagerProcess(config);
                    JobManagerProcess leader = (JobManagerProcess)jobManagerProcesses.remove(leaderIndex);
                    leader.destroy();
                    ++currentJobManagerKills;
                    LOG.info("Killed {}", (Object)leader);
                    jobManagerProcesses.add(newJobManager);
                    leaderIndex = this.waitForNewLeader(leaderListener, jobManagerProcesses, deadline.timeLeft());
                    this.waitForTaskManagers(3, (JobManagerProcess)jobManagerProcesses.get(leaderIndex), testActorSystem, deadline.timeLeft());
                    continue;
                }
                LOG.info("Killing a random TaskManager");
                TaskManagerProcess newTaskManager = this.createAndStartTaskManagerProcess(config);
                this.waitForTaskManagers(4, (JobManagerProcess)jobManagerProcesses.get(leaderIndex), testActorSystem, deadline.timeLeft());
                int next = this.rand.nextInt(3);
                TaskManagerProcess taskManager = (TaskManagerProcess)taskManagerProcesses.remove(next);
                LOG.info("{} has been chosen. Killing process...", (Object)taskManager);
                taskManager.destroy();
                ++currentTaskManagerKills;
                taskManagerProcesses.add(newTaskManager);
            }
            LOG.info("Chaos is over. Total kills: {} ({} job manager + {} task managers). Checking job status...", new Object[]{5, currentJobManagerKills, currentTaskManagerKills});
            TestJvmProcess.touchFile((File)ProceedCoordination);
            LOG.info("Waiting for job status {}", (Object)JobStatus.FINISHED);
            this.waitForJobFinished(jobGraph.getJobID(), (JobManagerProcess)jobManagerProcesses.get(leaderIndex), testActorSystem, deadline.timeLeft());
            LOG.info("Job finished");
            LOG.info("Waiting for job removal");
            this.waitForJobRemoved(jobGraph.getJobID(), (JobManagerProcess)jobManagerProcesses.get(leaderIndex), testActorSystem, deadline.timeLeft());
            LOG.info("Job removed");
            LOG.info("Checking clean recovery state...");
            this.checkCleanRecoveryState(config);
            LOG.info("Recovery state clean");
        }
        catch (Throwable t) {
            System.out.println("#################################################");
            System.out.println(" TASK MANAGERS");
            System.out.println("#################################################");
            for (TaskManagerProcess taskManagerProcess : taskManagerProcesses) {
                taskManagerProcess.printProcessLog();
            }
            System.out.println("#################################################");
            System.out.println(" JOB MANAGERS");
            System.out.println("#################################################");
            for (JobManagerProcess jobManagerProcess : jobManagerProcesses) {
                jobManagerProcess.printProcessLog();
            }
            t.printStackTrace();
        }
        finally {
            for (JobManagerProcess jobManagerProcess : jobManagerProcesses) {
                if (jobManagerProcess == null) continue;
                jobManagerProcess.destroy();
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            if (testActorSystem != null) {
                testActorSystem.shutdown();
            }
        }
    }

    private JobGraph createJobGraph(int n, String completedCheckpointMarker, String proceedMarker, int parallelism, int checkpointingIntervalMs) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.enableCheckpointing((long)checkpointingIntervalMs);
        int expectedResult = parallelism * n * (n + 1) / 2;
        env.addSource((SourceFunction)new CheckpointedSequenceSource(n, completedCheckpointMarker, proceedMarker)).addSink((SinkFunction)new CountingSink(parallelism, expectedResult)).setParallelism(1);
        return env.getStreamGraph().getJobGraph();
    }

    private void submitJobGraph(JobGraph jobGraph, JobManagerProcess jobManager, TestingListener leaderListener, ActorSystem actorSystem, FiniteDuration timeout) throws Exception {
        ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout);
        UUID jobManagerLeaderId = leaderListener.getLeaderSessionID();
        AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, jobManagerLeaderId);
        jobManagerGateway.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
    }

    private void checkCleanRecoveryState(Configuration config) throws Exception {
        LOG.info("Checking " + ZooKeeper.getClientNamespace() + "/jobgraphs");
        List jobGraphs = ZooKeeper.getChildren("/jobgraphs");
        Assert.assertEquals((String)("Unclean job graphs: " + jobGraphs), (long)0L, (long)jobGraphs.size());
        LOG.info("Checking " + ZooKeeper.getClientNamespace() + "/checkpoints");
        List checkpoints = ZooKeeper.getChildren("/checkpoints");
        Assert.assertEquals((String)("Unclean checkpoints: " + checkpoints), (long)0L, (long)checkpoints.size());
        LOG.info("Checking " + ZooKeeper.getClientNamespace() + "/checkpoint-counter");
        List checkpointCounter = ZooKeeper.getChildren("/checkpoint-counter");
        Assert.assertEquals((String)("Unclean checkpoint counter: " + checkpointCounter), (long)0L, (long)checkpointCounter.size());
        LOG.info("ZooKeeper state is clean");
        LOG.info("Checking file system backend state...");
        File fsCheckpoints = new File(config.getString("state.backend.fs.checkpointdir", ""));
        LOG.info("Checking " + fsCheckpoints);
        File[] files = fsCheckpoints.listFiles();
        if (files == null) {
            Assert.fail((String)(fsCheckpoints + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles())));
        } else {
            Assert.assertEquals((String)("Unclean file system checkpoints: " + Arrays.toString(fsCheckpoints.listFiles())), (long)0L, (long)files.length);
        }
        File fsRecovery = new File(config.getString("recovery.zookeeper.storageDir", ""));
        LOG.info("Checking " + fsRecovery);
        files = fsRecovery.listFiles();
        if (files == null) {
            Assert.fail((String)(fsRecovery + " does not exist: " + Arrays.toString(FileStateBackendBasePath.listFiles())));
        } else {
            Assert.assertEquals((String)("Unclean file system checkpoints: " + Arrays.toString(fsRecovery.listFiles())), (long)0L, (long)files.length);
        }
    }

    private void waitForJobRemoved(JobID jobId, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout) throws Exception {
        ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout);
        AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null);
        Future archiveFuture = jobManagerGateway.ask(JobManagerMessages.getRequestArchive(), timeout);
        ActorRef archive = ((JobManagerMessages.ResponseArchive)Await.result((Awaitable)archiveFuture, (Duration)timeout)).actor();
        AkkaActorGateway archiveGateway = new AkkaActorGateway(archive, null);
        Deadline deadline = timeout.fromNow();
        while (deadline.hasTimeLeft()) {
            JobManagerMessages.JobStatusResponse resp = JobManagerActorTestUtils.requestJobStatus((JobID)jobId, (ActorGateway)archiveGateway, (FiniteDuration)deadline.timeLeft());
            if (resp instanceof JobManagerMessages.JobNotFound) {
                Thread.sleep(100L);
                continue;
            }
            return;
        }
    }

    private JobStatus requestJobStatus(JobID jobId, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout) throws Exception {
        ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout);
        AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null);
        JobManagerMessages.JobStatusResponse resp = JobManagerActorTestUtils.requestJobStatus((JobID)jobId, (ActorGateway)jobManagerGateway, (FiniteDuration)timeout);
        if (resp instanceof JobManagerMessages.CurrentJobStatus) {
            JobManagerMessages.CurrentJobStatus jobStatusResponse = (JobManagerMessages.CurrentJobStatus)resp;
            return jobStatusResponse.status();
        }
        if (resp instanceof JobManagerMessages.JobNotFound) {
            return JobStatus.RESTARTING;
        }
        throw new IllegalStateException("Unexpected response from JobManager");
    }

    private void waitForJobRunning(JobID jobId, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout) throws Exception {
        ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout);
        AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null);
        JobManagerActorTestUtils.waitForJobStatus((JobID)jobId, (JobStatus)JobStatus.RUNNING, (ActorGateway)jobManagerGateway, (FiniteDuration)timeout);
    }

    private void waitForJobFinished(JobID jobId, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout) throws Exception {
        ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout);
        AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null);
        JobManagerActorTestUtils.waitForJobStatus((JobID)jobId, (JobStatus)JobStatus.FINISHED, (ActorGateway)jobManagerGateway, (FiniteDuration)timeout);
    }

    private void waitForTaskManagers(int minimumNumberOfTaskManagers, JobManagerProcess jobManager, ActorSystem actorSystem, FiniteDuration timeout) throws Exception {
        LOG.info("Waiting for {} task managers to connect to leading {}", (Object)minimumNumberOfTaskManagers, (Object)jobManager);
        ActorRef jobManagerRef = jobManager.getActorRef(actorSystem, timeout);
        AkkaActorGateway jobManagerGateway = new AkkaActorGateway(jobManagerRef, null);
        JobManagerActorTestUtils.waitForTaskManagers((int)minimumNumberOfTaskManagers, (ActorGateway)jobManagerGateway, (FiniteDuration)timeout);
        LOG.info("All task managers connected");
    }

    private int waitForNewLeader(TestingListener leaderListener, List<JobManagerProcess> jobManagerProcesses, FiniteDuration timeout) throws Exception {
        LOG.info("Waiting for new leader notification");
        leaderListener.waitForNewLeader(timeout.toMillis());
        LOG.info("Leader: {}:{}", (Object)leaderListener.getAddress(), (Object)leaderListener.getLeaderSessionID());
        String currentLeader = leaderListener.getAddress();
        int leaderIndex = -1;
        for (int i = 0; i < jobManagerProcesses.size(); ++i) {
            JobManagerProcess jobManager = jobManagerProcesses.get(i);
            if (!jobManager.getJobManagerAkkaURL().equals(currentLeader)) continue;
            leaderIndex = i;
            break;
        }
        if (leaderIndex == -1) {
            throw new IllegalStateException("Failed to determine which process is leader");
        }
        return leaderIndex;
    }

    private JobManagerProcess createAndStartJobManagerProcess(Configuration config) throws Exception {
        JobManagerProcess jobManager = new JobManagerProcess(this.jobManagerPid++, config);
        jobManager.createAndStart();
        LOG.info("Created and started {}.", (Object)jobManager);
        return jobManager;
    }

    private TaskManagerProcess createAndStartTaskManagerProcess(Configuration config) throws Exception {
        TaskManagerProcess taskManager = new TaskManagerProcess(this.taskManagerPid++, config);
        taskManager.createAndStart();
        LOG.info("Created and started {}.", (Object)taskManager);
        return taskManager;
    }

    static {
        try {
            FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
            CheckpointCompletedCoordination = new File(FileStateBackendBasePath, COMPLETED_PREFIX);
            ProceedCoordination = new File(FileStateBackendBasePath, "proceed");
        }
        catch (IOException e) {
            throw new RuntimeException("Error in test setup. Could not create directory.", e);
        }
    }

    public static class CountingSink
    extends RichSinkFunction<Long>
    implements Checkpointed<CountingSink>,
    CheckpointNotifier {
        private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class);
        private static final long serialVersionUID = 0L;
        private final int parallelism;
        private final long expectedFinalCount;
        private long current;
        private int numberOfReceivedLastElements;

        public CountingSink(int parallelism, long expectedFinalCount) {
            this.expectedFinalCount = expectedFinalCount;
            this.parallelism = parallelism;
        }

        public void invoke(Long value) throws Exception {
            if (value == -1L) {
                ++this.numberOfReceivedLastElements;
                if (this.numberOfReceivedLastElements == this.parallelism) {
                    if (this.current != this.expectedFinalCount) {
                        throw new Exception("Unexpected final result " + this.current);
                    }
                    LOG.info("Final result " + this.current);
                } else if (this.numberOfReceivedLastElements > this.parallelism) {
                    throw new IllegalStateException("Received more elements than parallelism.");
                }
            } else {
                this.current += value.longValue();
            }
        }

        public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            LOG.info("Snapshotting state {}:{} @ ID {}.", new Object[]{this.current, this.numberOfReceivedLastElements, checkpointId});
            return this;
        }

        public void restoreState(CountingSink state) {
            LOG.info("Restoring state {}:{}", (Object)state.current, (Object)state.numberOfReceivedLastElements);
            this.current = state.current;
            this.numberOfReceivedLastElements = state.numberOfReceivedLastElements;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            LOG.info("Checkpoint {} completed.", (Object)checkpointId);
        }
    }

    public static class CheckpointedSequenceSource
    extends RichParallelSourceFunction<Long>
    implements Checkpointed<Long>,
    CheckpointNotifier {
        private static final long serialVersionUID = 0L;
        private final long end;
        private final String completedCheckpointMarkerFilePath;
        private final File proceedFile;
        private long current = 0L;
        private volatile boolean isRunning = true;

        public CheckpointedSequenceSource(long end, String completedCheckpointMarkerFilePath, String proceedMarkerFilePath) {
            Preconditions.checkArgument((end >= 0L ? 1 : 0) != 0, (Object)"Negative final count");
            this.end = end;
            this.completedCheckpointMarkerFilePath = completedCheckpointMarkerFilePath;
            this.proceedFile = new File(proceedMarkerFilePath);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            while (this.isRunning) {
                if (!this.proceedFile.exists()) {
                    Thread.sleep(50L);
                }
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    if (this.current > this.end) {
                        ctx.collect((Object)-1L);
                        return;
                    }
                    ctx.collect((Object)this.current++);
                }
            }
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            LOG.info("Snapshotting state {} @ ID {}.", (Object)this.current, (Object)checkpointId);
            return this.current;
        }

        public void restoreState(Long state) {
            LOG.info("Restoring state {}/{}", (Object)state, (Object)this.end);
            this.current = state;
        }

        public void cancel() {
            this.isRunning = false;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            LOG.info("Checkpoint {} completed.", (Object)checkpointId);
            int taskIndex = this.getRuntimeContext().getIndexOfThisSubtask();
            TestJvmProcess.touchFile((File)new File(this.completedCheckpointMarkerFilePath + taskIndex));
        }
    }
}

