/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.pattern.Patterns;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class TaskManagerFailureRecoveryITCase {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRestartWithFailingTaskManager() {
        int PARALLELISM = 4;
        ForkableFlinkMiniCluster cluster = null;
        ActorSystem additionalSystem = null;
        try {
            int i;
            Configuration config = new Configuration();
            config.setInteger("localinstancemanager.numtaskmanager", 2);
            config.setInteger("taskmanager.numberOfTaskSlots", 4);
            config.setInteger("taskmanager.memory.size", 16);
            config.setString("akka.watch.heartbeat.interval", "500 ms");
            config.setString("akka.watch.heartbeat.pause", "20 s");
            config.setInteger("akka.watch.threshold", 20);
            cluster = new ForkableFlinkMiniCluster(config, false);
            ArrayList resultCollection = new ArrayList();
            final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getJobManagerRPCPort(), (String[])new String[0]);
            env.setParallelism(4);
            env.setNumberOfExecutionRetries(1);
            env.getConfig().disableSysoutLogging();
            env.generateSequence(1L, 10L).map(new FailingMapper()).reduce((ReduceFunction)new ReduceFunction<Long>(){

                public Long reduce(Long value1, Long value2) {
                    return value1 + value2;
                }
            }).output((OutputFormat)new LocalCollectionOutputFormat(resultCollection));
            final AtomicReference ref = new AtomicReference();
            Thread trigger = new Thread("program trigger"){

                @Override
                public void run() {
                    try {
                        env.execute();
                    }
                    catch (Throwable t) {
                        ref.set(t);
                    }
                }
            };
            trigger.setDaemon(true);
            trigger.start();
            for (int i2 = 0; i2 < 4; ++i2) {
                FailingMapper.TASK_TO_COORD_QUEUE.take();
            }
            additionalSystem = cluster.startTaskManagerActorSystem(2);
            ActorRef additionalTaskManager = cluster.startTaskManager(2, additionalSystem);
            TaskManagerMessages.NotifyWhenRegisteredAtJobManager$ message = TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage();
            Future future = Patterns.ask((ActorRef)additionalTaskManager, (Object)message, (long)30000L);
            try {
                Await.result((Awaitable)future, (Duration)new FiniteDuration(30000L, TimeUnit.MILLISECONDS));
            }
            catch (TimeoutException e) {
                Assert.fail((String)"The additional TaskManager did not come up within 30 seconds");
            }
            for (ActorRef tm : cluster.getTaskManagersAsJava()) {
                tm.tell((Object)PoisonPill.getInstance(), null);
            }
            for (i = 0; i < 4; ++i) {
                FailingMapper.TASK_TO_COORD_QUEUE.take();
            }
            for (i = 0; i < 4; ++i) {
                FailingMapper.COORD_TO_TASK_QUEUE.add(new Object());
            }
            trigger.join();
            if (ref.get() != null) {
                Throwable t = (Throwable)ref.get();
                t.printStackTrace();
                Assert.fail((String)("Program execution caused an exception: " + t.getMessage()));
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (additionalSystem != null) {
                additionalSystem.shutdown();
            }
            if (cluster != null) {
                cluster.stop();
            }
        }
    }

    private static class FailingMapper<T>
    extends RichMapFunction<T, T> {
        private static final long serialVersionUID = 4435412404173331157L;
        private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue<Object>();
        private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue<Object>();

        private FailingMapper() {
        }

        public void open(Configuration parameters) throws Exception {
            TASK_TO_COORD_QUEUE.add(new Object());
            COORD_TO_TASK_QUEUE.take();
        }

        public T map(T value) throws Exception {
            return value;
        }
    }
}

