/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.recovery;

import java.io.File;
import java.io.Serializable;
import java.nio.file.Path;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.testutils.DispatcherProcess;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.recovery.AbstractTaskManagerProcessFailureRecoveryTest;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;

@ExtendWith(value={NoOpTestExtension.class})
class JobManagerHAProcessFailureRecoveryITCase {
    private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension();
    @RegisterExtension
    final EachCallbackWrapper<ZooKeeperExtension> zooKeeperResource = new EachCallbackWrapper((CustomExtension)this.zooKeeperExtension);
    private static final Duration TEST_TIMEOUT = Duration.ofMinutes(5L);
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    @TempDir
    private Path tempDir;
    protected static final String READY_MARKER_FILE_PREFIX = "ready_";
    protected static final String FINISH_MARKER_FILE_PREFIX = "finish_";
    protected static final String PROCEED_MARKER_FILE = "proceed";
    protected static final int PARALLELISM = 4;

    JobManagerHAProcessFailureRecoveryITCase() {
    }

    private void testJobManagerFailure(String zkQuorum, final File coordinateDir, File zookeeperStoragePath) throws Exception {
        Configuration config = new Configuration();
        config.set(HighAvailabilityOptions.HA_MODE, (Object)"ZOOKEEPER");
        config.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, (Object)zkQuorum);
        config.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)zookeeperStoragePath.getAbsolutePath());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment((String)"leader", (int)1, (Configuration)config, (String[])new String[0]);
        env.setParallelism(4);
        Configuration configuration = new Configuration();
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)"fixed-delay");
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)1);
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY, (Object)Duration.ofMillis(0L));
        env.configure((ReadableConfig)configuration, Thread.currentThread().getContextClassLoader());
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        long numElements = 100000L;
        SingleOutputStreamOperator result = env.fromSequence(1L, 100000L).rebalance().map((MapFunction)new RichMapFunction<Long, Long>(){
            private final File proceedFile;
            private boolean markerCreated;
            private boolean checkForProceedFile;
            {
                this.proceedFile = new File(coordinateDir, JobManagerHAProcessFailureRecoveryITCase.PROCEED_MARKER_FILE);
                this.markerCreated = false;
                this.checkForProceedFile = true;
            }

            public Long map(Long value) throws Exception {
                if (!this.markerCreated) {
                    int taskIndex = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                    AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateDir, JobManagerHAProcessFailureRecoveryITCase.READY_MARKER_FILE_PREFIX + taskIndex));
                    this.markerCreated = true;
                }
                if (this.checkForProceedFile) {
                    if (this.proceedFile.exists()) {
                        this.checkForProceedFile = false;
                    } else {
                        Thread.sleep(100L);
                    }
                }
                return value;
            }
        }).setParallelism(4).keyBy((KeySelector & Serializable)x -> 0).reduce((ReduceFunction)new ReduceFunction<Long>(){

            public Long reduce(Long value1, Long value2) {
                return value1 + value2;
            }
        }).setParallelism(1).flatMap((FlatMapFunction)new RichFlatMapFunction<Long, Long>(){

            public void flatMap(Long value, Collector<Long> out) throws Exception {
                Assertions.assertThat((long)value).isEqualTo(5000050000L);
                int taskIndex = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
                AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateDir, JobManagerHAProcessFailureRecoveryITCase.FINISH_MARKER_FILE_PREFIX + taskIndex));
            }
        }).setParallelism(1);
        result.sinkTo((Sink)new DiscardingSink());
        env.execute();
    }

    @TestTemplate
    void testDispatcherProcessFailure() throws Exception {
        RpcService rpcService;
        File coordinateTempDir;
        LeaderRetrievalService leaderRetrievalService;
        HighAvailabilityServices highAvailabilityServices;
        TaskManagerRunner[] taskManagerRunners;
        DispatcherProcess[] dispatcherProcesses;
        block20: {
            final File zookeeperStoragePath = TempDirUtils.newFolder((Path)this.tempDir);
            int numberOfJobManagers = 2;
            int numberOfTaskManagers = 2;
            int numberOfSlotsPerTaskManager = 2;
            Assertions.assertThat((int)4).isEqualTo(4);
            dispatcherProcesses = new DispatcherProcess[2];
            taskManagerRunners = new TaskManagerRunner[2];
            highAvailabilityServices = null;
            leaderRetrievalService = null;
            coordinateTempDir = null;
            Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig((String)this.zooKeeperExtension.getConnectString(), (String)zookeeperStoragePath.getPath());
            config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"4m"));
            config.set(TaskManagerOptions.NETWORK_MEMORY_MIN, (Object)MemorySize.parse((String)"3200k"));
            config.set(TaskManagerOptions.NETWORK_MEMORY_MAX, (Object)MemorySize.parse((String)"3200k"));
            config.set(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_BUFFERS, (Object)16);
            config.set(TaskManagerOptions.NUM_TASK_SLOTS, (Object)2);
            config.set(TaskManagerOptions.TASK_HEAP_MEMORY, (Object)MemorySize.parse((String)"128m"));
            config.set(TaskManagerOptions.CPU_CORES, (Object)1.0);
            TaskExecutorResourceUtils.adjustForLocalExecution((Configuration)config);
            rpcService = RpcSystem.load().remoteServiceBuilder(config, "localhost", "0").createAndStart();
            try {
                Deadline deadline = Deadline.fromNow((Duration)TEST_TIMEOUT);
                coordinateTempDir = TempDirUtils.newFolder((Path)this.tempDir);
                dispatcherProcesses[0] = new DispatcherProcess(0, config);
                dispatcherProcesses[0].startProcess();
                highAvailabilityServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices((Configuration)config, (Executor)EXECUTOR_EXTENSION.getExecutor(), (FatalErrorHandler)NoOpFatalErrorHandler.INSTANCE);
                PluginManager pluginManager = PluginUtils.createPluginManagerFromRootFolder((Configuration)config);
                for (int i = 0; i < 2; ++i) {
                    taskManagerRunners[i] = new TaskManagerRunner(config, pluginManager, TaskManagerRunner::createTaskExecutorService);
                    taskManagerRunners[i].start();
                }
                TestingListener leaderListener = new TestingListener();
                leaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever();
                leaderRetrievalService.start((LeaderRetrievalListener)leaderListener);
                leaderListener.waitForNewLeader();
                String leaderAddress = leaderListener.getAddress();
                UUID leaderId = leaderListener.getLeaderSessionID();
                CompletableFuture dispatcherGatewayFuture = rpcService.connect(leaderAddress, (Serializable)DispatcherId.fromUuid((UUID)leaderId), DispatcherGateway.class);
                DispatcherGateway dispatcherGateway = (DispatcherGateway)dispatcherGatewayFuture.get();
                this.waitForTaskManagers(2, dispatcherGateway, deadline.timeLeft());
                final File coordinateDirClosure = coordinateTempDir;
                final Throwable[] errorRef = new Throwable[1];
                Thread programTrigger = new Thread("Program Trigger"){

                    @Override
                    public void run() {
                        try {
                            JobManagerHAProcessFailureRecoveryITCase.this.testJobManagerFailure(JobManagerHAProcessFailureRecoveryITCase.this.zooKeeperExtension.getConnectString(), coordinateDirClosure, zookeeperStoragePath);
                        }
                        catch (Throwable t) {
                            t.printStackTrace();
                            errorRef[0] = t;
                        }
                    }
                };
                programTrigger.start();
                AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir, READY_MARKER_FILE_PREFIX, 4, deadline.timeLeft().toMillis());
                dispatcherProcesses[0].destroy();
                dispatcherProcesses[1] = new DispatcherProcess(1, config);
                dispatcherProcesses[1].startProcess();
                AbstractTaskManagerProcessFailureRecoveryTest.touchFile(new File(coordinateTempDir, PROCEED_MARKER_FILE));
                programTrigger.join(deadline.timeLeft().toMillis());
                AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir, FINISH_MARKER_FILE_PREFIX, 1, deadline.timeLeft().toMillis());
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)programTrigger.isAlive()).as("The program did not finish in time", new Object[0])).isFalse();
                if (errorRef[0] == null) break block20;
                Throwable error = errorRef[0];
                error.printStackTrace();
                Assertions.fail((String)("The program encountered a " + error.getClass().getSimpleName() + " : " + error.getMessage()));
            }
            catch (Throwable t) {
                try {
                    t.printStackTrace();
                    for (DispatcherProcess p : dispatcherProcesses) {
                        if (p == null) continue;
                        p.printProcessLog();
                    }
                    throw t;
                }
                catch (Throwable throwable) {
                    for (int i = 0; i < 2; ++i) {
                        if (taskManagerRunners[i] == null) continue;
                        taskManagerRunners[i].close();
                    }
                    if (leaderRetrievalService != null) {
                        leaderRetrievalService.stop();
                    }
                    for (DispatcherProcess dispatcherProcess : dispatcherProcesses) {
                        if (dispatcherProcess == null) continue;
                        dispatcherProcess.destroy();
                    }
                    if (highAvailabilityServices != null) {
                        highAvailabilityServices.closeWithOptionalClean(true);
                    }
                    RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService});
                    if (coordinateTempDir != null) {
                        try {
                            FileUtils.deleteDirectory((File)coordinateTempDir);
                        }
                        catch (Throwable throwable2) {
                            // empty catch block
                        }
                    }
                    throw throwable;
                }
            }
        }
        for (int i = 0; i < 2; ++i) {
            if (taskManagerRunners[i] == null) continue;
            taskManagerRunners[i].close();
        }
        if (leaderRetrievalService != null) {
            leaderRetrievalService.stop();
        }
        for (DispatcherProcess dispatcherProcess : dispatcherProcesses) {
            if (dispatcherProcess == null) continue;
            dispatcherProcess.destroy();
        }
        if (highAvailabilityServices != null) {
            highAvailabilityServices.closeWithOptionalClean(true);
        }
        RpcUtils.terminateRpcService((RpcService[])new RpcService[]{rpcService});
        if (coordinateTempDir != null) {
            try {
                FileUtils.deleteDirectory((File)coordinateTempDir);
            }
            catch (Throwable i) {}
        }
    }

    private void waitForTaskManagers(int numberOfTaskManagers, DispatcherGateway dispatcherGateway, Duration timeLeft) throws ExecutionException, InterruptedException {
        FutureUtils.retrySuccessfulWithDelay(() -> dispatcherGateway.requestClusterOverview(timeLeft), (Duration)Duration.ofMillis(50L), (Deadline)Deadline.fromNow((Duration)timeLeft), clusterOverview -> clusterOverview.getNumTaskManagersConnected() >= numberOfTaskManagers, (ScheduledExecutor)new ScheduledExecutorServiceAdapter(Executors.newSingleThreadScheduledExecutor())).get();
    }
}

