/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.runtime.testutils.JobManagerProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class JobManagerCheckpointRecoveryITCase
extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);
    private static final FiniteDuration TestTimeOut = new FiniteDuration(5L, TimeUnit.MINUTES);
    private static final File FileStateBackendBasePath;
    private static final int Parallelism = 8;
    private static final CountDownLatch CompletedCheckpointsLatch;
    private static final AtomicLongArray RecoveredStates;
    private static final CountDownLatch FinalCountLatch;
    private static final AtomicReference<Long> FinalCount;
    private static final long LastElement = -1L;

    @AfterClass
    public static void tearDown() throws Exception {
        ZooKeeper.shutdown();
        if (FileStateBackendBasePath != null) {
            FileUtils.deleteDirectory((File)FileStateBackendBasePath);
        }
    }

    @Before
    public void cleanUp() throws Exception {
        if (FileStateBackendBasePath != null) {
            FileUtils.cleanDirectory((File)FileStateBackendBasePath);
        }
        ZooKeeper.deleteAll();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckpointedStreamingSumProgram() throws Exception {
        int checkpointingInterval = 200;
        int sequenceEnd = 5000;
        long expectedSum = 100020000L;
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(8);
        env.enableCheckpointing(200L);
        env.addSource((SourceFunction)new CheckpointedSequenceSource(5000L)).addSink((SinkFunction)new CountingSink()).setParallelism(1);
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig((String)ZooKeeper.getConnectString(), (String)FileStateBackendBasePath.getAbsoluteFile().toURI().toString());
        config.setInteger("taskmanager.numberOfTaskSlots", 8);
        ActorSystem testSystem = null;
        JobManagerProcess[] jobManagerProcess = new JobManagerProcess[2];
        ZooKeeperLeaderRetrievalService leaderRetrievalService = null;
        ActorSystem taskManagerSystem = null;
        try {
            Deadline deadline = TestTimeOut.fromNow();
            testSystem = AkkaUtils.createActorSystem((Configuration)new Configuration(), (Option)new Some((Object)new Tuple2((Object)"localhost", (Object)0)));
            jobManagerProcess[0] = new JobManagerProcess(0, config);
            jobManagerProcess[1] = new JobManagerProcess(1, config);
            jobManagerProcess[0].createAndStart();
            jobManagerProcess[1].createAndStart();
            TestingListener leaderListener = new TestingListener();
            leaderRetrievalService = ZooKeeperUtils.createLeaderRetrievalService((Configuration)config);
            leaderRetrievalService.start((LeaderRetrievalListener)leaderListener);
            taskManagerSystem = AkkaUtils.createActorSystem((Config)AkkaUtils.getDefaultAkkaConfig());
            TaskManager.startTaskManagerComponentsAndActor((Configuration)config, (ActorSystem)taskManagerSystem, (String)"localhost", (Option)Option.empty(), (Option)Option.empty(), (boolean)false, (StreamingMode)StreamingMode.STREAMING, TaskManager.class);
            leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
            String leaderAddress = leaderListener.getAddress();
            UUID leaderId = leaderListener.getLeaderSessionID();
            ActorRef leaderRef = AkkaUtils.getActorRef((String)leaderAddress, (ActorSystem)testSystem, (FiniteDuration)deadline.timeLeft());
            AkkaActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
            leader.tell((Object)new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED));
            JobManagerActorTestUtils.waitForJobStatus((JobID)jobGraph.getJobID(), (JobStatus)JobStatus.RUNNING, (ActorGateway)leader, (FiniteDuration)deadline.timeLeft());
            JobManagerProcess leadingJobManagerProcess = jobManagerProcess[0].getJobManagerAkkaURL().equals(leaderListener.getAddress()) ? jobManagerProcess[0] : jobManagerProcess[1];
            CompletedCheckpointsLatch.await();
            leadingJobManagerProcess.destroy();
            leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
            String leaderAddress2 = leaderListener.getAddress();
            UUID leaderId2 = leaderListener.getLeaderSessionID();
            ActorRef leaderRef2 = AkkaUtils.getActorRef((String)leaderAddress2, (ActorSystem)testSystem, (FiniteDuration)deadline.timeLeft());
            AkkaActorGateway leader2 = new AkkaActorGateway(leaderRef2, leaderId2);
            JobManagerActorTestUtils.waitForJobStatus((JobID)jobGraph.getJobID(), (JobStatus)JobStatus.RUNNING, (ActorGateway)leader2, (FiniteDuration)deadline.timeLeft());
            FinalCountLatch.await();
            Assert.assertEquals((long)100020000L, (long)FinalCount.get());
            for (int i = 0; i < 8; ++i) {
                Assert.assertNotEquals((long)0L, (long)RecoveredStates.get(i));
            }
        }
        catch (Throwable t) {
            if (jobManagerProcess[0] != null) {
                jobManagerProcess[0].printProcessLog();
            }
            if (jobManagerProcess[1] != null) {
                jobManagerProcess[1].printProcessLog();
            }
            t.printStackTrace();
        }
        finally {
            if (jobManagerProcess[0] != null) {
                jobManagerProcess[0].destroy();
            }
            if (jobManagerProcess[1] != null) {
                jobManagerProcess[1].destroy();
            }
            if (leaderRetrievalService != null) {
                leaderRetrievalService.stop();
            }
            if (taskManagerSystem != null) {
                taskManagerSystem.shutdown();
            }
            if (testSystem != null) {
                testSystem.shutdown();
            }
        }
    }

    static {
        try {
            FileStateBackendBasePath = CommonTestUtils.createTempDirectory();
        }
        catch (IOException e) {
            throw new RuntimeException("Error in test setup. Could not create directory.", e);
        }
        CompletedCheckpointsLatch = new CountDownLatch(2);
        RecoveredStates = new AtomicLongArray(8);
        FinalCountLatch = new CountDownLatch(1);
        FinalCount = new AtomicReference();
    }

    public static class CountingSink
    implements SinkFunction<Long>,
    Checkpointed<CountingSink>,
    CheckpointNotifier {
        private static final Logger LOG = LoggerFactory.getLogger(CountingSink.class);
        private static final long serialVersionUID = 1436484290453629091L;
        private long current = 0L;
        private int numberOfReceivedLastElements;

        public void invoke(Long value) throws Exception {
            if (value == -1L) {
                ++this.numberOfReceivedLastElements;
                if (this.numberOfReceivedLastElements == 8) {
                    FinalCount.set(this.current);
                    FinalCountLatch.countDown();
                } else if (this.numberOfReceivedLastElements > 8) {
                    throw new IllegalStateException("Received more elements than parallelism.");
                }
            } else {
                this.current += value.longValue();
            }
        }

        public CountingSink snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            LOG.debug("Snapshotting state {}:{} @ ID {}.", new Object[]{this.current, this.numberOfReceivedLastElements, checkpointId});
            return this;
        }

        public void restoreState(CountingSink state) {
            LOG.debug("Restoring state {}:{}", (Object)state.current, (Object)state.numberOfReceivedLastElements);
            this.current = state.current;
            this.numberOfReceivedLastElements = state.numberOfReceivedLastElements;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            LOG.debug("Checkpoint {} completed.", (Object)checkpointId);
            CompletedCheckpointsLatch.countDown();
        }
    }

    public static class CheckpointedSequenceSource
    extends RichParallelSourceFunction<Long>
    implements Checkpointed<Long> {
        private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class);
        private static final long serialVersionUID = 0L;
        private static final CountDownLatch sync = new CountDownLatch(8);
        private final long end;
        private long current = 0L;
        private volatile boolean isRunning = true;

        public CheckpointedSequenceSource(long end) {
            Preconditions.checkArgument((end >= 0L ? 1 : 0) != 0, (Object)"Negative final count");
            this.end = end;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            while (this.isRunning) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    if (this.current > this.end) {
                        ctx.collect((Object)-1L);
                        return;
                    }
                    ctx.collect((Object)this.current++);
                }
                if (sync.getCount() == 0L) continue;
                Thread.sleep(100L);
            }
        }

        public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
            LOG.debug("Snapshotting state {} @ ID {}.", (Object)this.current, (Object)checkpointId);
            return this.current;
        }

        public void restoreState(Long state) {
            LOG.debug("Restoring state {}", (Object)state);
            RecoveredStates.set(this.getRuntimeContext().getIndexOfThisSubtask(), state);
            sync.countDown();
            this.current = state;
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

