package org.apache.flink.test.recovery;

import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.MemoryExecutionGraphInfoStore;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.heartbeat.HeartbeatServicesImpl;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.security.token.NoOpDelegationTokenManager;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.BlobServerExtension;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.webmonitor.retriever.impl.VoidMetricQueryServiceRetriever;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.test.recovery.utils.TaskExecutorProcessEntryPoint;
import org.apache.flink.test.util.TestProcessBuilder;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/test/recovery/ProcessFailureCancelingITCase.class */
class ProcessFailureCancelingITCase {
    private static final String TASK_DEPLOYED_MARKER = "deployed";
    private static final Duration TIMEOUT = Duration.ofMinutes(2);

    @RegisterExtension
    public static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();

    @RegisterExtension
    public final EachCallbackWrapper<BlobServerExtension> blobServerExtensionWrapper = new EachCallbackWrapper<>(new BlobServerExtension());

    @RegisterExtension
    public final EachCallbackWrapper<ZooKeeperExtension> zooKeeperExtensionWrapper = new EachCallbackWrapper<>(new ZooKeeperExtension());

    @TempDir
    public Path temporaryFolder;

    ProcessFailureCancelingITCase() {
    }

    @Test
    void testCancelingOnProcessFailure() throws Throwable {
        Assumptions.assumeTrue(CommonTestUtils.getJavaCommandPath() != null, "---- Skipping Process Failure test : Could not find java executable ----");
        TestProcessBuilder.TestProcess testProcess = null;
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        final Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.ADDRESS, "localhost");
        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofSeconds(100L));
        configuration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, this.zooKeeperExtensionWrapper.getCustomExtension().getConnectString());
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TempDirUtils.newFolder(this.temporaryFolder).getAbsolutePath());
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("4m"));
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("3200k"));
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("3200k"));
        configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.parse("128m"));
        configuration.set(TaskManagerOptions.CPU_CORES, Double.valueOf(1.0d));
        configuration.setInteger(RestOptions.PORT, 0);
        RpcService createAndStart = RpcSystem.load().remoteServiceBuilder(configuration, "localhost", "0").createAndStart();
        configuration.setInteger(JobManagerOptions.PORT, createAndStart.getPort());
        DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory = DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.getInstance());
        DispatcherResourceManagerComponent dispatcherResourceManagerComponent = null;
        ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) EXECUTOR_RESOURCE.getExecutor();
        HighAvailabilityServices createHighAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(configuration, scheduledExecutorService, AddressResolution.NO_ADDRESS_RESOLUTION, RpcSystem.load(), NoOpFatalErrorHandler.INSTANCE);
        final AtomicReference atomicReference = new AtomicReference();
        try {
            try {
                dispatcherResourceManagerComponent = createSessionComponentFactory.create(configuration, ResourceID.generate(), scheduledExecutorService, createAndStart, createHighAvailabilityServices, this.blobServerExtensionWrapper.getCustomExtension().getBlobServer(), new HeartbeatServicesImpl(100L, 10000L, 2), new NoOpDelegationTokenManager(), NoOpMetricRegistry.INSTANCE, new MemoryExecutionGraphInfoStore(), VoidMetricQueryServiceRetriever.INSTANCE, Collections.emptySet(), testingFatalErrorHandler);
                TestProcessBuilder testProcessBuilder = new TestProcessBuilder(TaskExecutorProcessEntryPoint.class.getName());
                testProcessBuilder.addConfigAsMainClassArgs(configuration);
                TestProcessBuilder.TestProcess start = testProcessBuilder.start();
                Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.test.recovery.ProcessFailureCancelingITCase.1
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            ExecutionEnvironment createRemoteEnvironment = ExecutionEnvironment.createRemoteEnvironment("localhost", 1337, configuration, new String[0]);
                            createRemoteEnvironment.setParallelism(2);
                            createRemoteEnvironment.setRestartStrategy(RestartStrategies.noRestart());
                            createRemoteEnvironment.generateSequence(0L, Long.MAX_VALUE).map(new MapFunction<Long, Long>() { // from class: org.apache.flink.test.recovery.ProcessFailureCancelingITCase.1.1
                                public Long map(Long l) throws Exception {
                                    synchronized (this) {
                                        System.out.println(ProcessFailureCancelingITCase.TASK_DEPLOYED_MARKER);
                                        wait();
                                    }
                                    return 0L;
                                }
                            }).output(new DiscardingOutputFormat());
                            createRemoteEnvironment.execute();
                        } catch (Throwable th) {
                            atomicReference.set(th);
                        }
                    }
                });
                thread.start();
                waitUntilAtLeastOneTaskHasBeenDeployed(start);
                start.destroy();
                testProcess = null;
                thread.join(TIMEOUT.toMillis());
                Assertions.assertThat(thread.isAlive()).withFailMessage("The program did not cancel in time", new Object[0]).isFalse();
                Assertions.assertThat((Throwable) atomicReference.get()).withFailMessage("The program did not fail properly", new Object[0]).isInstanceOf(ProgramInvocationException.class);
                if (0 != 0) {
                    testProcess.destroy();
                }
                if (dispatcherResourceManagerComponent != null) {
                    dispatcherResourceManagerComponent.stopApplication(ApplicationStatus.SUCCEEDED, (String) null).get();
                }
                testingFatalErrorHandler.rethrowError();
                RpcUtils.terminateRpcService(new RpcService[]{createAndStart});
                createHighAvailabilityServices.closeAndCleanupAllData();
            } catch (Error | Exception e) {
                if (testProcess != null) {
                    printOutput("TaskManager OUT", testProcess.getProcessOutput().toString());
                    printOutput("TaskManager ERR", testProcess.getErrorOutput().toString());
                }
                throw ExceptionUtils.firstOrSuppressed(e, (Throwable) atomicReference.get());
            }
        } catch (Throwable th) {
            if (testProcess != null) {
                testProcess.destroy();
            }
            if (dispatcherResourceManagerComponent != null) {
                dispatcherResourceManagerComponent.stopApplication(ApplicationStatus.SUCCEEDED, (String) null).get();
            }
            testingFatalErrorHandler.rethrowError();
            RpcUtils.terminateRpcService(new RpcService[]{createAndStart});
            createHighAvailabilityServices.closeAndCleanupAllData();
            throw th;
        }
    }

    private static void waitUntilAtLeastOneTaskHasBeenDeployed(TestProcessBuilder.TestProcess testProcess) throws InterruptedException, TimeoutException {
        org.apache.flink.core.testutils.CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(testProcess.getProcessOutput().toString().contains(TASK_DEPLOYED_MARKER));
        }, Duration.ofMinutes(2L), (String) null);
    }

    private static void printOutput(String str, String str2) {
        if (str2 == null || str2.length() == 0) {
            return;
        }
        System.out.println("-----------------------------------------");
        System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + str);
        System.out.println("-----------------------------------------");
        System.out.println(str2);
        System.out.println("-----------------------------------------");
        System.out.println("\t\tEND SPAWNED PROCESS LOG");
        System.out.println("-----------------------------------------");
    }
}
