/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.net.NetUtils;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public abstract class AbstractProcessFailureRecoveryTest {
    protected static final String READY_MARKER_FILE_PREFIX = "ready_";
    protected static final String PROCEED_MARKER_FILE = "proceed";
    protected static final int PARALLELISM = 4;

    /*
     * Loose catch block
     */
    @Test
    public void testTaskManagerProcessFailure() {
        block37: {
            File coordinateTempDir;
            block35: {
                String javaCommand;
                Process taskManagerProcess3;
                Process taskManagerProcess2;
                Process taskManagerProcess1;
                ActorSystem jmActorSystem;
                StringWriter processOutput3;
                StringWriter processOutput2;
                StringWriter processOutput1;
                block33: {
                    block34: {
                        processOutput1 = new StringWriter();
                        processOutput2 = new StringWriter();
                        processOutput3 = new StringWriter();
                        jmActorSystem = null;
                        taskManagerProcess1 = null;
                        taskManagerProcess2 = null;
                        taskManagerProcess3 = null;
                        coordinateTempDir = null;
                        javaCommand = CommonTestUtils.getJavaCommandPath();
                        if (javaCommand != null) break block33;
                        System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
                        if (taskManagerProcess1 != null) {
                            taskManagerProcess1.destroy();
                        }
                        if (taskManagerProcess2 != null) {
                            taskManagerProcess2.destroy();
                        }
                        if (taskManagerProcess3 != null) {
                            taskManagerProcess3.destroy();
                        }
                        if (jmActorSystem == null) break block34;
                        jmActorSystem.shutdown();
                    }
                    if (coordinateTempDir != null) {
                        try {
                            FileUtils.deleteDirectory(coordinateTempDir);
                        }
                        catch (Throwable throwable) {
                            // empty catch block
                        }
                    }
                    return;
                }
                try {
                    File tempLogFile = File.createTempFile(this.getClass().getSimpleName() + "-", "-log4j.properties");
                    tempLogFile.deleteOnExit();
                    CommonTestUtils.printLog4jDebugConfig((File)tempLogFile);
                    coordinateTempDir = AbstractProcessFailureRecoveryTest.createTempDirectory();
                    final int jobManagerPort = NetUtils.getAvailablePort();
                    Tuple2 localAddress = new Tuple2((Object)"localhost", (Object)jobManagerPort);
                    Configuration jmConfig = new Configuration();
                    jmConfig.setString("akka.watch.heartbeat.interval", "1000 ms");
                    jmConfig.setString("akka.watch.heartbeat.pause", "6 s");
                    jmConfig.setInteger("akka.watch.threshold", 9);
                    jmConfig.setString("execution-retries.delay", "10 s");
                    jmActorSystem = AkkaUtils.createActorSystem((Configuration)jmConfig, (Option)new Some((Object)localAddress));
                    ActorRef jmActor = (ActorRef)JobManager.startJobManagerActors((Configuration)jmConfig, (ActorSystem)jmActorSystem, (StreamingMode)StreamingMode.STREAMING)._1();
                    String[] command = new String[]{javaCommand, "-Dlog.level=DEBUG", "-Dlog4j.configuration=file:" + tempLogFile.getAbsolutePath(), "-Xms80m", "-Xmx80m", "-classpath", CommonTestUtils.getCurrentClasspath(), TaskManagerProcessEntryPoint.class.getName(), String.valueOf(jobManagerPort)};
                    taskManagerProcess1 = new ProcessBuilder(command).start();
                    new PipeForwarder(taskManagerProcess1.getErrorStream(), processOutput1);
                    taskManagerProcess2 = new ProcessBuilder(command).start();
                    new PipeForwarder(taskManagerProcess2.getErrorStream(), processOutput2);
                    this.waitUntilNumTaskManagersAreRegistered(jmActor, 2, 120000L);
                    final File coordinateDirClosure = coordinateTempDir;
                    final Throwable[] errorRef = new Throwable[1];
                    Thread programTrigger = new Thread("Program Trigger"){

                        @Override
                        public void run() {
                            try {
                                AbstractProcessFailureRecoveryTest.this.testProgram(jobManagerPort, coordinateDirClosure);
                            }
                            catch (Throwable t) {
                                t.printStackTrace();
                                errorRef[0] = t;
                            }
                        }
                    };
                    programTrigger.start();
                    AbstractProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir, 4, 20000L);
                    taskManagerProcess3 = new ProcessBuilder(command).start();
                    new PipeForwarder(taskManagerProcess3.getErrorStream(), processOutput3);
                    this.waitUntilNumTaskManagersAreRegistered(jmActor, 3, 120000L);
                    taskManagerProcess1.destroy();
                    taskManagerProcess1 = null;
                    AbstractProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
                    programTrigger.join(300000L);
                    Assert.assertFalse((String)"The program did not finish in time", (boolean)programTrigger.isAlive());
                    if (errorRef[0] != null) {
                        Throwable error = errorRef[0];
                        error.printStackTrace();
                        Assert.fail((String)("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage()));
                    }
                    if (taskManagerProcess1 != null) {
                        taskManagerProcess1.destroy();
                    }
                    if (taskManagerProcess2 != null) {
                        taskManagerProcess2.destroy();
                    }
                    if (taskManagerProcess3 != null) {
                        taskManagerProcess3.destroy();
                    }
                    if (jmActorSystem == null) break block35;
                }
                catch (Exception e2222222222) {
                    block36: {
                        e2222222222.printStackTrace();
                        AbstractProcessFailureRecoveryTest.printProcessLog("TaskManager 1", processOutput1.toString());
                        AbstractProcessFailureRecoveryTest.printProcessLog("TaskManager 2", processOutput2.toString());
                        AbstractProcessFailureRecoveryTest.printProcessLog("TaskManager 3", processOutput3.toString());
                        Assert.fail((String)e2222222222.getMessage());
                        if (taskManagerProcess1 != null) {
                            taskManagerProcess1.destroy();
                        }
                        if (taskManagerProcess2 != null) {
                            taskManagerProcess2.destroy();
                        }
                        if (taskManagerProcess3 != null) {
                            taskManagerProcess3.destroy();
                        }
                        if (jmActorSystem == null) break block36;
                        jmActorSystem.shutdown();
                    }
                    if (coordinateTempDir != null) {
                        try {
                            FileUtils.deleteDirectory((File)coordinateTempDir);
                        }
                        catch (Throwable e2222222222) {}
                    }
                    break block37;
                }
                catch (Error e) {
                    e.printStackTrace();
                    AbstractProcessFailureRecoveryTest.printProcessLog("TaskManager 1", processOutput1.toString());
                    AbstractProcessFailureRecoveryTest.printProcessLog("TaskManager 2", processOutput2.toString());
                    AbstractProcessFailureRecoveryTest.printProcessLog("TaskManager 3", processOutput3.toString());
                    throw e;
                    {
                        catch (Throwable throwable) {
                            if (taskManagerProcess1 != null) {
                                taskManagerProcess1.destroy();
                            }
                            if (taskManagerProcess2 != null) {
                                taskManagerProcess2.destroy();
                            }
                            if (taskManagerProcess3 != null) {
                                taskManagerProcess3.destroy();
                            }
                            if (jmActorSystem != null) {
                                jmActorSystem.shutdown();
                            }
                            if (coordinateTempDir != null) {
                                try {
                                    FileUtils.deleteDirectory(coordinateTempDir);
                                }
                                catch (Throwable throwable2) {
                                    // empty catch block
                                }
                            }
                            throw throwable;
                        }
                    }
                }
                jmActorSystem.shutdown();
            }
            if (coordinateTempDir != null) {
                try {
                    FileUtils.deleteDirectory((File)coordinateTempDir);
                }
                catch (Throwable javaCommand) {}
            }
        }
    }

    public abstract void testProgram(int var1, File var2) throws Exception;

    protected void waitUntilNumTaskManagersAreRegistered(ActorRef jobManager, int numExpected, long maxDelay) throws Exception {
        long deadline = System.currentTimeMillis() + maxDelay;
        while (true) {
            long remaining;
            if ((remaining = deadline - System.currentTimeMillis()) <= 0L) {
                Assert.fail((String)("The TaskManagers did not register within the expected time (" + maxDelay + "msecs)"));
            }
            FiniteDuration timeout = new FiniteDuration(remaining, TimeUnit.MILLISECONDS);
            try {
                Future result = Patterns.ask((ActorRef)jobManager, (Object)JobManagerMessages.getRequestNumberRegisteredTaskManager(), (Timeout)new Timeout(timeout));
                Integer numTMs = (Integer)Await.result((Awaitable)result, (Duration)timeout);
                if (numTMs != numExpected) continue;
            }
            catch (TimeoutException result) {
                continue;
            }
            catch (ClassCastException e) {
                Assert.fail((String)("Wrong response: " + e.getMessage()));
                continue;
            }
            break;
        }
    }

    protected static void printProcessLog(String processName, String log) {
        if (log == null || log.length() == 0) {
            return;
        }
        System.out.println("-----------------------------------------");
        System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + processName);
        System.out.println("-----------------------------------------");
        System.out.println(log);
        System.out.println("-----------------------------------------");
        System.out.println("\t\tEND SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }

    protected static File createTempDirectory() throws IOException {
        File tempDir = new File(System.getProperty("java.io.tmpdir"));
        for (int i = 0; i < 10; ++i) {
            File dir = new File(tempDir, UUID.randomUUID().toString());
            if (!dir.exists() && dir.mkdirs()) {
                return dir;
            }
            System.err.println("Could not use temporary directory " + dir.getAbsolutePath());
        }
        throw new IOException("Could not create temporary file directory");
    }

    protected static void touchFile(File file) throws IOException {
        if (!file.exists()) {
            new FileOutputStream(file).close();
        }
        if (!file.setLastModified(System.currentTimeMillis())) {
            throw new IOException("Could not touch the file.");
        }
    }

    protected static void waitForMarkerFiles(File basedir, int num, long timeout) {
        long now = System.currentTimeMillis();
        long deadline = now + timeout;
        while (now < deadline) {
            boolean allFound = true;
            for (int i = 0; i < num; ++i) {
                File nextToCheck = new File(basedir, READY_MARKER_FILE_PREFIX + i);
                if (nextToCheck.exists()) continue;
                allFound = false;
                break;
            }
            if (allFound) {
                return;
            }
            try {
                Thread.sleep(10L);
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            now = System.currentTimeMillis();
        }
        Assert.fail((String)("The tasks were not started within time (" + timeout + "msecs)"));
    }

    protected static class PipeForwarder
    extends Thread {
        private final StringWriter target;
        private final InputStream source;

        public PipeForwarder(InputStream source, StringWriter target) {
            super("Pipe Forwarder");
            this.setDaemon(true);
            this.source = source;
            this.target = target;
            this.start();
        }

        @Override
        public void run() {
            try {
                int next;
                while ((next = this.source.read()) != -1) {
                    this.target.write(next);
                }
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    public static class TaskManagerProcessEntryPoint {
        private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public static void main(String[] args) {
            try {
                Object lock;
                int jobManagerPort = Integer.parseInt(args[0]);
                Configuration cfg = new Configuration();
                cfg.setString("jobmanager.rpc.address", "localhost");
                cfg.setInteger("jobmanager.rpc.port", jobManagerPort);
                cfg.setInteger("taskmanager.memory.size", 4);
                cfg.setInteger("taskmanager.network.numberOfBuffers", 100);
                cfg.setInteger("taskmanager.numberOfTaskSlots", 2);
                TaskManager.selectNetworkInterfaceAndRunTaskManager((Configuration)cfg, (StreamingMode)StreamingMode.STREAMING, TaskManager.class);
                Object object = lock = new Object();
                synchronized (object) {
                    lock.wait();
                }
            }
            catch (Throwable t) {
                LOG.error("Failed to start TaskManager process", t);
                System.exit(1);
            }
        }
    }
}

