package org.apache.flink.test.recovery;

import akka.actor.ActorSystem;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.testutils.junit.RetryOnFailure;
import org.apache.flink.testutils.junit.RetryRule;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.class */
public class JobManagerHACheckpointRecoveryITCase extends TestLogger {

    @Rule
    public RetryRule retryRule = new RetryRule();
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final FiniteDuration TestTimeOut = new FiniteDuration(5, TimeUnit.MINUTES);
    private static final File FileStateBackendBasePath;
    private static final int Parallelism = 8;
    private static CountDownLatch CompletedCheckpointsLatch;
    private static AtomicLongArray RecoveredStates;
    private static CountDownLatch FinalCountLatch;
    private static AtomicReference<Long> FinalCount;
    private static long LastElement;

    /* loaded from: input_file:org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase$CheckpointedSequenceSource.class */
    public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long> implements Checkpointed<Long> {
        private static final long serialVersionUID = 0;
        private final long end;
        private long current = 0;
        private volatile boolean isRunning = true;
        private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class);
        private static final CountDownLatch sync = new CountDownLatch(JobManagerHACheckpointRecoveryITCase.Parallelism);

        public CheckpointedSequenceSource(long j) {
            Preconditions.checkArgument(j >= 0, "Negative final count");
            this.end = j;
        }

        public void run(SourceFunction.SourceContext<Long> sourceContext) throws Exception {
            while (this.isRunning) {
                synchronized (sourceContext.getCheckpointLock()) {
                    if (this.current > this.end) {
                        sourceContext.collect(Long.valueOf(JobManagerHACheckpointRecoveryITCase.LastElement));
                        return;
                    } else {
                        long j = this.current;
                        this.current = j + 1;
                        sourceContext.collect(Long.valueOf(j));
                    }
                }
                if (sync.getCount() != 0) {
                    Thread.sleep(100L);
                }
            }
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public Long m640snapshotState(long j, long j2) throws Exception {
            LOG.debug("Snapshotting state {} @ ID {}.", Long.valueOf(this.current), Long.valueOf(j));
            return Long.valueOf(this.current);
        }

        public void restoreState(Long l) {
            LOG.debug("Restoring state {}", l);
            JobManagerHACheckpointRecoveryITCase.RecoveredStates.set(getRuntimeContext().getIndexOfThisSubtask(), l.longValue());
            sync.countDown();
            this.current = l.longValue();
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase$CountingSink.class */
    public static class CountingSink implements SinkFunction<Long>, Checkpointed<CountingSink>, CheckpointListener {
        private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class);
        private static final long serialVersionUID = 1436484290453629091L;
        private long current = 0;
        private int numberOfReceivedLastElements;

        public void invoke(Long l) throws Exception {
            if (l.longValue() != JobManagerHACheckpointRecoveryITCase.LastElement) {
                this.current += l.longValue();
                return;
            }
            this.numberOfReceivedLastElements++;
            if (this.numberOfReceivedLastElements == JobManagerHACheckpointRecoveryITCase.Parallelism) {
                JobManagerHACheckpointRecoveryITCase.FinalCount.set(Long.valueOf(this.current));
                JobManagerHACheckpointRecoveryITCase.FinalCountLatch.countDown();
            } else if (this.numberOfReceivedLastElements > JobManagerHACheckpointRecoveryITCase.Parallelism) {
                throw new IllegalStateException("Received more elements than parallelism.");
            }
        }

        /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
        public CountingSink m642snapshotState(long j, long j2) throws Exception {
            LOG.debug("Snapshotting state {}:{} @ ID {}.", new Object[]{Long.valueOf(this.current), Integer.valueOf(this.numberOfReceivedLastElements), Long.valueOf(j)});
            return this;
        }

        public void restoreState(CountingSink countingSink) {
            LOG.debug("Restoring state {}:{}", Long.valueOf(countingSink.current), Integer.valueOf(countingSink.numberOfReceivedLastElements));
            this.current = countingSink.current;
            this.numberOfReceivedLastElements = countingSink.numberOfReceivedLastElements;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            LOG.debug("Checkpoint {} completed.", Long.valueOf(j));
            JobManagerHACheckpointRecoveryITCase.CompletedCheckpointsLatch.countDown();
        }
    }

    @AfterClass
    public static void tearDown() throws Exception {
        ZooKeeper.shutdown();
        if (FileStateBackendBasePath != null) {
            FileUtils.deleteDirectory(FileStateBackendBasePath);
        }
    }

    @Before
    public void cleanUp() throws Exception {
        if (FileStateBackendBasePath != null && FileStateBackendBasePath.exists()) {
            FileUtils.cleanDirectory(FileStateBackendBasePath);
        }
        ZooKeeper.deleteAll();
    }

    @Test
    @RetryOnFailure(times = 1)
    public void testCheckpointedStreamingSumProgram() throws Exception {
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
        createLocalEnvironment.setParallelism(Parallelism);
        createLocalEnvironment.enableCheckpointing(200L);
        createLocalEnvironment.addSource(new CheckpointedSequenceSource(5000L)).addSink(new CountingSink()).setParallelism(1);
        JobGraph jobGraph = createLocalEnvironment.getStreamGraph().getJobGraph();
        Configuration createZooKeeperRecoveryModeConfig = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper.getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
        createZooKeeperRecoveryModeConfig.setInteger("taskmanager.numberOfTaskSlots", Parallelism);
        ActorSystem actorSystem = null;
        JobManagerProcess[] jobManagerProcessArr = new JobManagerProcess[2];
        LeaderRetrievalService leaderRetrievalService = null;
        ActorSystem actorSystem2 = null;
        try {
            try {
                Deadline fromNow = TestTimeOut.fromNow();
                actorSystem = AkkaUtils.createActorSystem(new Configuration(), new Some(new Tuple2("localhost", 0)));
                jobManagerProcessArr[0] = new JobManagerProcess(0, createZooKeeperRecoveryModeConfig);
                jobManagerProcessArr[1] = new JobManagerProcess(1, createZooKeeperRecoveryModeConfig);
                jobManagerProcessArr[0].startProcess();
                jobManagerProcessArr[1].startProcess();
                TestingListener testingListener = new TestingListener();
                leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(createZooKeeperRecoveryModeConfig);
                leaderRetrievalService.start(testingListener);
                actorSystem2 = AkkaUtils.createActorSystem(createZooKeeperRecoveryModeConfig, Option.apply(new Tuple2("localhost", 0)));
                TaskManager.startTaskManagerComponentsAndActor(createZooKeeperRecoveryModeConfig, ResourceID.generate(), actorSystem2, "localhost", Option.empty(), Option.empty(), false, TaskManager.class);
                testingListener.waitForNewLeader(fromNow.timeLeft().toMillis());
                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(AkkaUtils.getActorRef(testingListener.getAddress(), actorSystem, fromNow.timeLeft()), testingListener.getLeaderSessionID());
                akkaActorGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
                JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, akkaActorGateway, fromNow.timeLeft());
                JobManagerProcess jobManagerProcess = jobManagerProcessArr[0].getJobManagerAkkaURL(fromNow.timeLeft()).equals(testingListener.getAddress()) ? jobManagerProcessArr[0] : jobManagerProcessArr[1];
                CompletedCheckpointsLatch.await();
                jobManagerProcess.destroy();
                testingListener.waitForNewLeader(fromNow.timeLeft().toMillis());
                JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, new AkkaActorGateway(AkkaUtils.getActorRef(testingListener.getAddress(), actorSystem, fromNow.timeLeft()), testingListener.getLeaderSessionID()), fromNow.timeLeft());
                FinalCountLatch.await();
                Assert.assertEquals(100020000L, FinalCount.get().longValue());
                for (int i = 0; i < Parallelism; i++) {
                    Assert.assertNotEquals(0L, RecoveredStates.get(i));
                }
                if (jobManagerProcessArr[0] != null) {
                    jobManagerProcessArr[0].destroy();
                }
                if (jobManagerProcessArr[1] != null) {
                    jobManagerProcessArr[1].destroy();
                }
                if (leaderRetrievalService != null) {
                    leaderRetrievalService.stop();
                }
                if (actorSystem2 != null) {
                    actorSystem2.shutdown();
                }
                if (actorSystem != null) {
                    actorSystem.shutdown();
                }
            } catch (Throwable th) {
                CompletedCheckpointsLatch = new CountDownLatch(2);
                RecoveredStates = new AtomicLongArray(Parallelism);
                FinalCountLatch = new CountDownLatch(1);
                FinalCount = new AtomicReference<>();
                LastElement = -1L;
                th.printStackTrace();
                if (jobManagerProcessArr[0] != null) {
                    jobManagerProcessArr[0].printProcessLog();
                }
                if (jobManagerProcessArr[1] != null) {
                    jobManagerProcessArr[1].printProcessLog();
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (jobManagerProcessArr[0] != null) {
                jobManagerProcessArr[0].destroy();
            }
            if (jobManagerProcessArr[1] != null) {
                jobManagerProcessArr[1].destroy();
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            if (actorSystem2 != null) {
                actorSystem2.shutdown();
            }
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            throw th2;
        }
    }

    @Test
    @RetryOnFailure(times = 1)
    public void testCheckpointRecoveryFailure() throws Exception {
        JobManagerProcess jobManagerProcess;
        JobManagerProcess jobManagerProcess2;
        Deadline fromNow = TestTimeOut.fromNow();
        Configuration createZooKeeperRecoveryModeConfig = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(ZooKeeper.getConnectString(), FileStateBackendBasePath.getAbsoluteFile().toString());
        createZooKeeperRecoveryModeConfig.setInteger("local.number-jobmanager", 2);
        JobManagerProcess[] jobManagerProcessArr = new JobManagerProcess[2];
        LeaderRetrievalService leaderRetrievalService = null;
        ActorSystem actorSystem = null;
        ActorSystem actorSystem2 = null;
        try {
            try {
                actorSystem2 = AkkaUtils.createActorSystem(new Configuration(), new Some(new Tuple2("localhost", 0)));
                jobManagerProcessArr[0] = new JobManagerProcess(0, createZooKeeperRecoveryModeConfig);
                jobManagerProcessArr[1] = new JobManagerProcess(1, createZooKeeperRecoveryModeConfig);
                jobManagerProcessArr[0].startProcess();
                jobManagerProcessArr[1].startProcess();
                TestingListener testingListener = new TestingListener();
                leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService(createZooKeeperRecoveryModeConfig);
                leaderRetrievalService.start(testingListener);
                actorSystem = AkkaUtils.createActorSystem(createZooKeeperRecoveryModeConfig, Option.apply(new Tuple2("localhost", 0)));
                TaskManager.startTaskManagerComponentsAndActor(createZooKeeperRecoveryModeConfig, ResourceID.generate(), actorSystem, "localhost", Option.empty(), Option.empty(), false, TaskManager.class);
                testingListener.waitForNewLeader(fromNow.timeLeft().toMillis());
                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(AkkaUtils.getActorRef(testingListener.getAddress(), actorSystem2, fromNow.timeLeft()), testingListener.getLeaderSessionID());
                if (jobManagerProcessArr[0].getJobManagerAkkaURL(fromNow.timeLeft()).equals(testingListener.getAddress())) {
                    jobManagerProcess = jobManagerProcessArr[0];
                    jobManagerProcess2 = jobManagerProcessArr[1];
                } else {
                    jobManagerProcess = jobManagerProcessArr[1];
                    jobManagerProcess2 = jobManagerProcessArr[0];
                }
                JobVertex jobVertex = new JobVertex("Blocking vertex");
                jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
                JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex});
                akkaActorGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
                JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, akkaActorGateway, fromNow.timeLeft());
                FileUtils.deleteDirectory(FileStateBackendBasePath);
                jobManagerProcess.destroy();
                boolean z = false;
                while (true) {
                    if (!fromNow.hasTimeLeft()) {
                        break;
                    }
                    String processOutput = jobManagerProcess2.getProcessOutput();
                    if (processOutput == null) {
                        this.log.warn("No process output available.");
                    } else if (processOutput.contains("Fatal error: Failed to recover jobs") && processOutput.contains("java.io.FileNotFoundException")) {
                        z = true;
                        break;
                    }
                    Thread.sleep(500L);
                }
                Assert.assertTrue("Did not find expected output in logs.", z);
                if (jobManagerProcessArr[0] != null) {
                    jobManagerProcessArr[0].destroy();
                }
                if (jobManagerProcessArr[1] != null) {
                    jobManagerProcessArr[1].destroy();
                }
                if (leaderRetrievalService != null) {
                    leaderRetrievalService.stop();
                }
                if (actorSystem != null) {
                    actorSystem.shutdown();
                }
                if (actorSystem2 != null) {
                    actorSystem2.shutdown();
                }
            } catch (Throwable th) {
                th.printStackTrace();
                if (jobManagerProcessArr[0] != null) {
                    jobManagerProcessArr[0].printProcessLog();
                }
                if (jobManagerProcessArr[1] != null) {
                    jobManagerProcessArr[1].printProcessLog();
                }
                throw th;
            }
        } catch (Throwable th2) {
            if (jobManagerProcessArr[0] != null) {
                jobManagerProcessArr[0].destroy();
            }
            if (jobManagerProcessArr[1] != null) {
                jobManagerProcessArr[1].destroy();
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorSystem2 != null) {
                actorSystem2.shutdown();
            }
            throw th2;
        }
    }

    static {
        try {
            FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
            CompletedCheckpointsLatch = new CountDownLatch(2);
            RecoveredStates = new AtomicLongArray(Parallelism);
            FinalCountLatch = new CountDownLatch(1);
            FinalCount = new AtomicReference<>();
            LastElement = -1L;
        } catch (IOException e) {
            throw new RuntimeException("Error in test setup. Could not create directory.", e);
        }
    }
}
