/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.File;
import java.io.StringWriter;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.test.recovery.AbstractProcessFailureRecoveryTest;
import org.junit.Assert;
import org.junit.Test;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class ProcessFailureCancelingITCase {
    @Test
    public void testCancelingOnProcessFailure() {
        StringWriter processOutput = new StringWriter();
        ActorSystem jmActorSystem = null;
        Process taskManagerProcess = null;
        try {
            String javaCommand = CommonTestUtils.getJavaCommandPath();
            if (javaCommand == null) {
                System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
                return;
            }
            File tempLogFile = File.createTempFile(this.getClass().getSimpleName() + "-", "-log4j.properties");
            tempLogFile.deleteOnExit();
            CommonTestUtils.printLog4jDebugConfig((File)tempLogFile);
            final int jobManagerPort = NetUtils.getAvailablePort();
            Tuple2 localAddress = new Tuple2((Object)"localhost", (Object)jobManagerPort);
            Configuration jmConfig = new Configuration();
            jmConfig.setString("akka.watch.heartbeat.interval", "5 s");
            jmConfig.setString("akka.watch.heartbeat.pause", "2000 s");
            jmConfig.setInteger("akka.watch.threshold", 10);
            jmConfig.setString("akka.ask.timeout", "10 s");
            jmActorSystem = AkkaUtils.createActorSystem((Configuration)jmConfig, (Option)new Some((Object)localAddress));
            ActorRef jmActor = (ActorRef)JobManager.startJobManagerActors((Configuration)jmConfig, (ActorSystem)jmActorSystem, (StreamingMode)StreamingMode.BATCH_ONLY)._1();
            String[] command = new String[]{javaCommand, "-Dlog.level=DEBUG", "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), "-Xms80m", "-Xmx80m", "-classpath", CommonTestUtils.getCurrentClasspath(), AbstractProcessFailureRecoveryTest.TaskManagerProcessEntryPoint.class.getName(), String.valueOf(jobManagerPort)};
            taskManagerProcess = new ProcessBuilder(command).start();
            new AbstractProcessFailureRecoveryTest.PipeForwarder(taskManagerProcess.getErrorStream(), processOutput);
            this.waitUntilNumTaskManagersAreRegistered(jmActor, 1, 30000L);
            final Throwable[] errorRef = new Throwable[1];
            Runnable programRunner = new Runnable(){

                @Override
                public void run() {
                    try {
                        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)jobManagerPort, (String[])new String[0]);
                        env.setParallelism(2);
                        env.setNumberOfExecutionRetries(0);
                        env.getConfig().disableSysoutLogging();
                        env.generateSequence(0L, Long.MAX_VALUE).map((MapFunction)new MapFunction<Long, Long>(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            public Long map(Long value) throws Exception {
                                1 var2_2 = this;
                                synchronized (var2_2) {
                                    this.wait();
                                }
                                return 0L;
                            }
                        }).output((OutputFormat)new DiscardingOutputFormat());
                        env.execute();
                    }
                    catch (Throwable t) {
                        errorRef[0] = t;
                    }
                }
            };
            Thread programThread = new Thread(programRunner);
            taskManagerProcess.destroy();
            taskManagerProcess = null;
            programThread.start();
            this.cancelRunningJob(jmActor);
            programThread.join(120000L);
            Assert.assertFalse((String)"The program did not cancel in time (2 minutes)", (boolean)programThread.isAlive());
            Throwable error = errorRef[0];
            Assert.assertNotNull((String)"The program did not fail properly", (Object)error);
            Assert.assertTrue((boolean)(error instanceof ProgramInvocationException));
        }
        catch (Exception e) {
            e.printStackTrace();
            this.printProcessLog("TaskManager", processOutput.toString());
            Assert.fail((String)e.getMessage());
        }
        catch (Error e) {
            e.printStackTrace();
            this.printProcessLog("TaskManager 1", processOutput.toString());
            throw e;
        }
        finally {
            if (taskManagerProcess != null) {
                taskManagerProcess.destroy();
            }
            if (jmActorSystem != null) {
                jmActorSystem.shutdown();
            }
        }
    }

    private void cancelRunningJob(ActorRef jobManager) throws Exception {
        FiniteDuration askTimeout = new FiniteDuration(10L, TimeUnit.SECONDS);
        long deadline = System.currentTimeMillis() + 30000L;
        JobID jobId = null;
        do {
            List jobs;
            Object result;
            Future response = Patterns.ask((ActorRef)jobManager, (Object)JobManagerMessages.getRequestRunningJobsStatus(), (Timeout)new Timeout(askTimeout));
            try {
                result = Await.result((Awaitable)response, (Duration)askTimeout);
            }
            catch (Exception e) {
                throw new Exception("Could not retrieve running jobs from the JobManager.", e);
            }
            if (!(result instanceof JobManagerMessages.RunningJobsStatus) || (jobs = ((JobManagerMessages.RunningJobsStatus)result).getStatusMessages()).size() != 1) continue;
            jobId = ((JobStatusMessage)jobs.get(0)).getJobId();
            break;
        } while (System.currentTimeMillis() < deadline);
        if (jobId == null) {
            return;
        }
        jobManager.tell((Object)new JobManagerMessages.CancelJob(jobId), ActorRef.noSender());
    }

    private void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay) throws Exception {
        long deadline = System.currentTimeMillis() + maxDelay;
        while (true) {
            long remaining;
            if ((remaining = deadline - System.currentTimeMillis()) <= 0L) {
                Assert.fail((String)("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)"));
            }
            FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
            try {
                Future result = Patterns.ask((ActorRef)jobManager, (Object)JobManagerMessages.getRequestNumberRegisteredTaskManager(), (Timeout)new Timeout(timeout));
                Integer numTMs = (Integer)Await.result((Awaitable)result, (Duration)timeout);
                if (numTMs != numExpected) continue;
            }
            catch (TimeoutException result) {
                continue;
            }
            catch (ClassCastException e) {
                Assert.fail((String)("Wrong response: " + e.getMessage()));
                continue;
            }
            break;
        }
    }

    private void printProcessLog(String processName, String log) {
        if (log == null || log.length() == 0) {
            return;
        }
        System.out.println("-----------------------------------------");
        System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
        System.out.println("-----------------------------------------");
        System.out.println(log);
        System.out.println("-----------------------------------------");
        System.out.println("\t\tEND SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }
}

