package org.apache.flink.test.recovery;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringWriter;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.class */
public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends TestLogger {
    protected final Logger LOG = LoggerFactory.getLogger(getClass());
    protected static final String READY_MARKER_FILE_PREFIX = "ready_";
    protected static final String PROCEED_MARKER_FILE = "proceed";
    protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
    protected static final int PARALLELISM = 4;

    /* loaded from: input_file:org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest$TaskManagerProcessEntryPoint.class */
    public static class TaskManagerProcessEntryPoint {
        private static final Logger LOG = LoggerFactory.getLogger(TaskManagerProcessEntryPoint.class);

        public static void main(String[] strArr) {
            try {
                int parseInt = Integer.parseInt(strArr[0]);
                Configuration configuration = new Configuration();
                configuration.setString("jobmanager.rpc.address", "localhost");
                configuration.setInteger("jobmanager.rpc.port", parseInt);
                configuration.setInteger("taskmanager.memory.size", AbstractTaskManagerProcessFailureRecoveryTest.PARALLELISM);
                configuration.setInteger("taskmanager.network.numberOfBuffers", 100);
                configuration.setInteger("taskmanager.numberOfTaskSlots", 2);
                configuration.setString("akka.ask.timeout", "100 s");
                TaskManager.selectNetworkInterfaceAndRunTaskManager(configuration, ResourceID.generate(), TaskManager.class);
                Object obj = new Object();
                synchronized (obj) {
                    obj.wait();
                }
            } catch (Throwable th) {
                LOG.error("Failed to start TaskManager process", th);
                System.exit(1);
            }
        }
    }

    @Test
    public void testTaskManagerProcessFailure() {
        StringWriter stringWriter = new StringWriter();
        StringWriter stringWriter2 = new StringWriter();
        StringWriter stringWriter3 = new StringWriter();
        ActorSystem actorSystem = null;
        Process process = null;
        Process process2 = null;
        Process process3 = null;
        try {
            try {
                String javaCommandPath = CommonTestUtils.getJavaCommandPath();
                if (javaCommandPath == null) {
                    System.out.println("---- Skipping Process Failure test : Could not find java executable ----");
                    if (0 != 0) {
                        process.destroy();
                    }
                    if (0 != 0) {
                        process2.destroy();
                    }
                    if (0 != 0) {
                        process3.destroy();
                    }
                    if (0 != 0) {
                        actorSystem.shutdown();
                    }
                    if (0 != 0) {
                        try {
                            FileUtils.deleteDirectory((File) null);
                            return;
                        } catch (Throwable th) {
                            return;
                        }
                    }
                    return;
                }
                File createTempFile = File.createTempFile(getClass().getSimpleName() + "-", "-log4j.properties");
                createTempFile.deleteOnExit();
                CommonTestUtils.printLog4jDebugConfig(createTempFile);
                final File createTempDirectory = CommonTestUtils.createTempDirectory();
                final int availablePort = NetUtils.getAvailablePort();
                Tuple2 tuple2 = new Tuple2("localhost", Integer.valueOf(availablePort));
                Configuration configuration = new Configuration();
                configuration.setString("akka.watch.heartbeat.interval", "1000 ms");
                configuration.setString("akka.watch.heartbeat.pause", "6 s");
                configuration.setInteger("akka.watch.threshold", 9);
                configuration.setString("restart-strategy.fixed-delay.delay", "10 s");
                configuration.setString("akka.ask.timeout", "100 s");
                ActorSystem createActorSystem = AkkaUtils.createActorSystem(configuration, new Some(tuple2));
                ActorRef actorRef = (ActorRef) JobManager.startJobManagerActors(configuration, createActorSystem, createActorSystem.dispatcher(), createActorSystem.dispatcher(), JobManager.class, MemoryArchivist.class)._1();
                String[] strArr = {javaCommandPath, "-Dlog.level=DEBUG", "-Dlog4j.configuration=file:" + createTempFile.getAbsolutePath(), "-Xms80m", "-Xmx80m", "-classpath", CommonTestUtils.getCurrentClasspath(), TaskManagerProcessEntryPoint.class.getName(), String.valueOf(availablePort)};
                Process start = new ProcessBuilder(strArr).start();
                new CommonTestUtils.PipeForwarder(start.getErrorStream(), stringWriter);
                Process start2 = new ProcessBuilder(strArr).start();
                new CommonTestUtils.PipeForwarder(start2.getErrorStream(), stringWriter2);
                waitUntilNumTaskManagersAreRegistered(actorRef, 2, 120000L);
                final AtomicReference atomicReference = new AtomicReference();
                Thread thread = new Thread("Program Trigger") { // from class: org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        try {
                            AbstractTaskManagerProcessFailureRecoveryTest.this.testTaskManagerFailure(availablePort, createTempDirectory);
                        } catch (Throwable th2) {
                            th2.printStackTrace();
                            atomicReference.set(th2);
                        }
                    }
                };
                thread.start();
                if (!waitForMarkerFiles(createTempDirectory, READY_MARKER_FILE_PREFIX, PARALLELISM, 120000L)) {
                    if (atomicReference.get() != null) {
                        Throwable th2 = (Throwable) atomicReference.get();
                        th2.printStackTrace();
                        Assert.fail("The program encountered a " + th2.getClass().getSimpleName() + " : " + th2.getMessage());
                    } else {
                        Assert.fail("The tasks were not started within time (120000msecs)");
                    }
                }
                Process start3 = new ProcessBuilder(strArr).start();
                new CommonTestUtils.PipeForwarder(start3.getErrorStream(), stringWriter3);
                waitUntilNumTaskManagersAreRegistered(actorRef, 3, 120000L);
                start.destroy();
                Process process4 = null;
                touchFile(new File(createTempDirectory, PROCEED_MARKER_FILE));
                thread.join(300000L);
                Assert.assertFalse("The program did not finish in time", thread.isAlive());
                if (atomicReference.get() != null) {
                    Throwable th3 = (Throwable) atomicReference.get();
                    th3.printStackTrace();
                    Assert.fail("The program encountered a " + th3.getClass().getSimpleName() + " : " + th3.getMessage());
                }
                if (0 != 0) {
                    process4.destroy();
                }
                if (start2 != null) {
                    start2.destroy();
                }
                if (start3 != null) {
                    start3.destroy();
                }
                if (createActorSystem != null) {
                    createActorSystem.shutdown();
                }
                if (createTempDirectory != null) {
                    try {
                        FileUtils.deleteDirectory(createTempDirectory);
                    } catch (Throwable th4) {
                    }
                }
            } catch (Throwable th5) {
                if (0 != 0) {
                    process.destroy();
                }
                if (0 != 0) {
                    process2.destroy();
                }
                if (0 != 0) {
                    process3.destroy();
                }
                if (0 != 0) {
                    actorSystem.shutdown();
                }
                if (0 != 0) {
                    try {
                        FileUtils.deleteDirectory((File) null);
                    } catch (Throwable th6) {
                    }
                }
                throw th5;
            }
        } catch (Error e) {
            e.printStackTrace();
            printProcessLog("TaskManager 1", stringWriter.toString());
            printProcessLog("TaskManager 2", stringWriter2.toString());
            printProcessLog("TaskManager 3", stringWriter3.toString());
            throw e;
        } catch (Exception e2) {
            e2.printStackTrace();
            printProcessLog("TaskManager 1", stringWriter.toString());
            printProcessLog("TaskManager 2", stringWriter2.toString());
            printProcessLog("TaskManager 3", stringWriter3.toString());
            Assert.fail(e2.getMessage());
            if (0 != 0) {
                process.destroy();
            }
            if (0 != 0) {
                process2.destroy();
            }
            if (0 != 0) {
                process3.destroy();
            }
            if (0 != 0) {
                actorSystem.shutdown();
            }
            if (0 != 0) {
                try {
                    FileUtils.deleteDirectory((File) null);
                } catch (Throwable th7) {
                }
            }
        }
    }

    public abstract void testTaskManagerFailure(int i, File file) throws Exception;

    protected void waitUntilNumTaskManagersAreRegistered(ActorRef actorRef, int i, long j) throws Exception {
        long nanoTime = System.nanoTime() + (j * 1000000);
        while (true) {
            long nanoTime2 = System.nanoTime();
            if (nanoTime2 >= nanoTime) {
                Assert.fail("The TaskManagers did not register within the expected time (" + j + "msecs)");
                return;
            }
            FiniteDuration finiteDuration = new FiniteDuration(10000000L, TimeUnit.NANOSECONDS);
            try {
                if (((Integer) Await.result(Patterns.ask(actorRef, JobManagerMessages.getRequestNumberRegisteredTaskManager(), new Timeout(finiteDuration)), finiteDuration)).intValue() == i) {
                    return;
                }
            } catch (ClassCastException e) {
                Assert.fail("Wrong response: " + e.getMessage());
            } catch (TimeoutException e2) {
            }
            long nanoTime3 = (10000000 - (System.nanoTime() - nanoTime2)) / 1000000;
            if (nanoTime3 > 0) {
                Thread.sleep(nanoTime3);
            }
        }
    }

    protected static void printProcessLog(String str, String str2) {
        if (str2 == null || str2.length() == 0) {
            return;
        }
        System.out.println("-----------------------------------------");
        System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + str);
        System.out.println("-----------------------------------------");
        System.out.println(str2);
        System.out.println("-----------------------------------------");
        System.out.println("\t\tEND SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void touchFile(File file) throws IOException {
        if (!file.exists()) {
            new FileOutputStream(file).close();
        }
        if (!file.setLastModified(System.currentTimeMillis())) {
            throw new IOException("Could not touch the file.");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static boolean waitForMarkerFiles(File file, String str, int i, long j) {
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = currentTimeMillis + j;
        while (currentTimeMillis < j2) {
            boolean z = true;
            int i2 = 0;
            while (true) {
                if (i2 >= i) {
                    break;
                }
                if (!new File(file, str + i2).exists()) {
                    z = false;
                    break;
                }
                i2++;
            }
            if (z) {
                return true;
            }
            try {
                Thread.sleep(10L);
                currentTimeMillis = System.currentTimeMillis();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return false;
    }
}
