package org.apache.flink.test.recovery;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.pattern.Patterns;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.class */
public class TaskManagerFailureRecoveryITCase {

    /* loaded from: input_file:org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase$FailingMapper.class */
    private static class FailingMapper<T> extends RichMapFunction<T, T> {
        private static final long serialVersionUID = 4435412404173331157L;
        private static final BlockingQueue<Object> TASK_TO_COORD_QUEUE = new LinkedBlockingQueue();
        private static final BlockingQueue<Object> COORD_TO_TASK_QUEUE = new LinkedBlockingQueue();

        private FailingMapper() {
        }

        public void open(Configuration configuration) throws Exception {
            TASK_TO_COORD_QUEUE.add(new Object());
            COORD_TO_TASK_QUEUE.take();
        }

        public T map(T t) throws Exception {
            return t;
        }
    }

    @Test
    public void testRestartWithFailingTaskManager() {
        ForkableFlinkMiniCluster forkableFlinkMiniCluster = null;
        ActorSystem actorSystem = null;
        try {
            try {
                Configuration configuration = new Configuration();
                configuration.setInteger("local.number-taskmanager", 2);
                configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
                configuration.setInteger("taskmanager.memory.size", 16);
                configuration.setString("akka.watch.heartbeat.interval", "500 ms");
                configuration.setString("akka.watch.heartbeat.pause", "20 s");
                configuration.setInteger("akka.watch.threshold", 20);
                forkableFlinkMiniCluster = new ForkableFlinkMiniCluster(configuration, false);
                forkableFlinkMiniCluster.start();
                ArrayList arrayList = new ArrayList();
                final ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("localhost", forkableFlinkMiniCluster.getLeaderRPCPort(), new String[0]);
                createRemoteEnvironment.setParallelism(4);
                createRemoteEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 1000L));
                createRemoteEnvironment.getConfig().disableSysoutLogging();
                createRemoteEnvironment.generateSequence(1L, 10L).map(new FailingMapper()).reduce(new ReduceFunction<Long>() { // from class: org.apache.flink.test.recovery.TaskManagerFailureRecoveryITCase.1
                    public Long reduce(Long l, Long l2) {
                        return Long.valueOf(l.longValue() + l2.longValue());
                    }
                }).output(new LocalCollectionOutputFormat(arrayList));
                final AtomicReference atomicReference = new AtomicReference();
                Thread thread = new Thread("program trigger") { // from class: org.apache.flink.test.recovery.TaskManagerFailureRecoveryITCase.2
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            createRemoteEnvironment.execute();
                        } catch (Throwable th) {
                            atomicReference.set(th);
                        }
                    }
                };
                thread.setDaemon(true);
                thread.start();
                for (int i = 0; i < 4; i++) {
                    FailingMapper.TASK_TO_COORD_QUEUE.take();
                }
                actorSystem = forkableFlinkMiniCluster.startTaskManagerActorSystem(2);
                try {
                    Await.result(Patterns.ask(forkableFlinkMiniCluster.startTaskManager(2, actorSystem), TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), 30000L), new FiniteDuration(30000L, TimeUnit.MILLISECONDS));
                } catch (TimeoutException e) {
                    Assert.fail("The additional TaskManager did not come up within 30 seconds");
                }
                Iterator it = forkableFlinkMiniCluster.getTaskManagersAsJava().iterator();
                while (it.hasNext()) {
                    ((ActorRef) it.next()).tell(PoisonPill.getInstance(), (ActorRef) null);
                }
                for (int i2 = 0; i2 < 4; i2++) {
                    FailingMapper.TASK_TO_COORD_QUEUE.take();
                }
                for (int i3 = 0; i3 < 4; i3++) {
                    FailingMapper.COORD_TO_TASK_QUEUE.add(new Object());
                }
                thread.join();
                if (atomicReference.get() != null) {
                    Throwable th = (Throwable) atomicReference.get();
                    th.printStackTrace();
                    Assert.fail("Program execution caused an exception: " + th.getMessage());
                }
                if (actorSystem != null) {
                    actorSystem.shutdown();
                }
                if (forkableFlinkMiniCluster != null) {
                    forkableFlinkMiniCluster.stop();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
                Assert.fail(e2.getMessage());
                if (actorSystem != null) {
                    actorSystem.shutdown();
                }
                if (forkableFlinkMiniCluster != null) {
                    forkableFlinkMiniCluster.stop();
                }
            }
        } catch (Throwable th2) {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (forkableFlinkMiniCluster != null) {
                forkableFlinkMiniCluster.stop();
            }
            throw th2;
        }
    }
}
